This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new 5f4d733b3 [#4291] feat(catalog-lakehouse-paimon): Support Paimon table 
with primary keys in Gravitino (#4383)
5f4d733b3 is described below

commit 5f4d733b38572367a21bcb9db3bb948111dc6bf0
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Aug 6 10:26:48 2024 +0800

    [#4291] feat(catalog-lakehouse-paimon): Support Paimon table with primary 
keys in Gravitino (#4383)
    
    ### What changes were proposed in this pull request?
    
    Support Paimon table with primary keys in Gravitino
    
    ### Why are the changes needed?
    
    Fix:  https://github.com/apache/gravitino/issues/4291
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    New UTs and ITs.
    
    Co-authored-by: cai can <[email protected]>
    Co-authored-by: caican <[email protected]>
---
 .../lakehouse/paimon/GravitinoPaimonTable.java     | 113 +++++++++++++++--
 .../lakehouse/paimon/PaimonCatalogOperations.java  |  25 +++-
 .../lakehouse/paimon/TestGravitinoPaimonTable.java | 141 ++++++++++++++++++++-
 .../integration/test/CatalogPaimonBaseIT.java      | 113 +++++++++++++++++
 4 files changed, 370 insertions(+), 22 deletions(-)

diff --git 
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java
 
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java
index 6df977548..2853abbbe 100644
--- 
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java
+++ 
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java
@@ -18,12 +18,19 @@
  */
 package org.apache.gravitino.catalog.lakehouse.paimon;
 
+import static 
org.apache.gravitino.catalog.lakehouse.paimon.PaimonTablePropertiesMetadata.COMMENT;
+import static 
org.apache.gravitino.dto.rel.partitioning.Partitioning.EMPTY_PARTITIONING;
 import static org.apache.gravitino.meta.AuditInfo.EMPTY;
+import static org.apache.gravitino.rel.indexes.Indexes.primary;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 import lombok.Getter;
 import lombok.ToString;
@@ -32,6 +39,7 @@ import org.apache.gravitino.connector.TableOperations;
 import org.apache.gravitino.rel.expressions.NamedReference;
 import org.apache.gravitino.rel.expressions.transforms.Transform;
 import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.types.DataField;
@@ -44,6 +52,9 @@ import org.apache.paimon.types.DataField;
 @Getter
 public class GravitinoPaimonTable extends BaseTable {
 
+  @VisibleForTesting
+  public static final String PAIMON_PRIMARY_KEY_INDEX_NAME = 
"PAIMON_PRIMARY_KEY_INDEX";
+
   private GravitinoPaimonTable() {}
 
   @Override
@@ -58,20 +69,23 @@ public class GravitinoPaimonTable extends BaseTable {
    * @return The converted Paimon table.
    */
   public Schema toPaimonTableSchema() {
-    Schema.Builder builder = 
Schema.newBuilder().comment(comment).options(properties);
-    if (partitioning != null) {
-      builder.partitionKeys(
-          Arrays.stream(partitioning)
-              .map(
-                  partition -> {
-                    NamedReference[] references = partition.references();
-                    Preconditions.checkArgument(
-                        references.length == 1,
-                        "Partitioning column must be single-column, like 
'a'.");
-                    return references[0].toString();
-                  })
-              .collect(Collectors.toList()));
+    Schema.Builder builder = Schema.newBuilder().comment(comment);
+    if (properties == null) {
+      properties = Maps.newHashMap();
+    }
+    if (partitioning == null) {
+      partitioning = EMPTY_PARTITIONING;
     }
+
+    Map<String, String> normalizedProperties = new HashMap<>(properties);
+    normalizedProperties.remove(COMMENT);
+
+    List<String> partitionKeys = getPartitionKeys(partitioning);
+    List<String> primaryKeys = getPrimaryKeysFromIndexes(indexes);
+
+    validate(primaryKeys, partitionKeys);
+
+    
builder.options(normalizedProperties).primaryKey(primaryKeys).partitionKeys(partitionKeys);
     for (int index = 0; index < columns.length; index++) {
       DataField dataField = GravitinoPaimonColumn.toPaimonColumn(index, 
columns[index]);
       builder.column(dataField.name(), dataField.type(), 
dataField.description());
@@ -94,6 +108,7 @@ public class GravitinoPaimonTable extends BaseTable {
         .withPartitioning(toGravitinoPartitioning(table.partitionKeys()))
         .withComment(table.comment().orElse(null))
         .withProperties(table.options())
+        .withIndexes(constructIndexesFromPrimaryKeys(table))
         .withAuditInfo(EMPTY)
         .build();
   }
@@ -102,6 +117,77 @@ public class GravitinoPaimonTable extends BaseTable {
     return 
partitionKeys.stream().map(Transforms::identity).toArray(Transform[]::new);
   }
 
+  private static List<String> getPartitionKeys(Transform[] partitioning) {
+    if (partitioning == null) {
+      return Collections.emptyList();
+    }
+
+    return Arrays.stream(partitioning)
+        .map(
+            partition -> {
+              NamedReference[] references = partition.references();
+              Preconditions.checkArgument(
+                  references.length == 1, "Partitioning column must be 
single-column, like 'a'.");
+              return references[0].toString();
+            })
+        .collect(Collectors.toList());
+  }
+
+  private List<String> getPrimaryKeysFromIndexes(Index[] indexes) {
+    if (indexes == null || indexes.length == 0) {
+      return Collections.emptyList();
+    }
+
+    Preconditions.checkArgument(
+        indexes.length == 1, "Paimon only supports no more than one Index.");
+
+    Index primaryKeyIndex = indexes[0];
+    Arrays.stream(primaryKeyIndex.fieldNames())
+        .forEach(
+            filedName ->
+                Preconditions.checkArgument(
+                    filedName != null && filedName.length == 1,
+                    "The primary key columns should not be nested."));
+
+    return Arrays.stream(primaryKeyIndex.fieldNames())
+        .map(fieldName -> fieldName[0])
+        .collect(Collectors.toList());
+  }
+
+  private static Index[] constructIndexesFromPrimaryKeys(Table table) {
+    Index[] indexes = new Index[0];
+    if (table.primaryKeys() != null && !table.primaryKeys().isEmpty()) {
+      String[][] filedNames = constructIndexFiledNames(table.primaryKeys());
+      indexes =
+          Collections.singletonList(primary(PAIMON_PRIMARY_KEY_INDEX_NAME, 
filedNames))
+              .toArray(new Index[0]);
+    }
+    return indexes;
+  }
+
+  private static String[][] constructIndexFiledNames(List<String> primaryKeys) 
{
+    return primaryKeys.stream()
+        .map(pk -> new String[] {pk})
+        .collect(Collectors.toList())
+        .toArray(new String[0][0]);
+  }
+
+  private static void validate(List<String> primaryKeys, List<String> 
partitionKeys) {
+    if (!primaryKeys.isEmpty()) {
+      List<String> adjusted =
+          primaryKeys.stream()
+              .filter(pk -> !partitionKeys.contains(pk))
+              .collect(Collectors.toList());
+
+      Preconditions.checkState(
+          !adjusted.isEmpty(),
+          String.format(
+              "Paimon Table Primary key constraint %s should not be same with 
partition fields %s,"
+                  + " this will result in only one record in a partition.",
+              primaryKeys, partitionKeys));
+    }
+  }
+
   /** A builder class for constructing {@link GravitinoPaimonTable} instance. 
*/
   public static class Builder extends BaseTableBuilder<Builder, 
GravitinoPaimonTable> {
 
@@ -121,6 +207,7 @@ public class GravitinoPaimonTable extends BaseTable {
       paimonTable.columns = columns;
       paimonTable.partitioning = partitioning;
       paimonTable.properties = properties == null ? Maps.newHashMap() : 
Maps.newHashMap(properties);
+      paimonTable.indexes = indexes;
       paimonTable.auditInfo = auditInfo;
       return paimonTable;
     }
diff --git 
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java
 
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java
index 57095e237..68a5ebcff 100644
--- 
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java
+++ 
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java
@@ -22,6 +22,7 @@ import static 
org.apache.gravitino.catalog.lakehouse.paimon.GravitinoPaimonTable
 import static 
org.apache.gravitino.catalog.lakehouse.paimon.PaimonSchema.fromPaimonProperties;
 import static org.apache.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX;
 import static 
org.apache.gravitino.rel.expressions.transforms.Transforms.EMPTY_TRANSFORM;
+import static org.apache.gravitino.rel.indexes.Indexes.EMPTY_INDEXES;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -325,6 +326,9 @@ public class PaimonCatalogOperations implements 
CatalogOperations, SupportsSchem
     if (partitioning == null) {
       partitioning = EMPTY_TRANSFORM;
     }
+    if (indexes == null) {
+      indexes = EMPTY_INDEXES;
+    }
     Preconditions.checkArgument(
         Arrays.stream(partitioning)
             .allMatch(
@@ -334,15 +338,13 @@ public class PaimonCatalogOperations implements 
CatalogOperations, SupportsSchem
                       && references[0] instanceof 
NamedReference.FieldReference;
                 }),
         "Paimon partition columns should not be nested.");
-    Preconditions.checkArgument(
-        sortOrders == null || sortOrders.length == 0,
-        "Sort orders are not supported for Paimon in Gravitino.");
-    Preconditions.checkArgument(
-        indexes == null || indexes.length == 0,
-        "Indexes are not supported for Paimon in Gravitino.");
     Preconditions.checkArgument(
         distribution == null || distribution.strategy() == 
Distributions.NONE.strategy(),
         "Distribution is not supported for Paimon in Gravitino now.");
+    Preconditions.checkArgument(
+        sortOrders == null || sortOrders.length == 0,
+        "Sort orders are not supported for Paimon in Gravitino.");
+    checkPaimonIndexes(indexes);
     String currentUser = currentUser();
     GravitinoPaimonTable createdTable =
         GravitinoPaimonTable.builder()
@@ -366,6 +368,7 @@ public class PaimonCatalogOperations implements 
CatalogOperations, SupportsSchem
             .withPartitioning(partitioning)
             .withComment(comment)
             .withProperties(properties)
+            .withIndexes(indexes)
             .withAuditInfo(
                 
AuditInfo.builder().withCreator(currentUser).withCreateTime(Instant.now()).build())
             .build();
@@ -457,4 +460,14 @@ public class PaimonCatalogOperations implements 
CatalogOperations, SupportsSchem
     String[] levels = identifier.namespace().levels();
     return NameIdentifier.of(levels[levels.length - 1], identifier.name());
   }
+
+  private void checkPaimonIndexes(Index[] indexes) {
+    Preconditions.checkArgument(indexes.length <= 1, "Paimon supports no more 
than one Index.");
+    Arrays.stream(indexes)
+        .forEach(
+            index ->
+                Preconditions.checkArgument(
+                    index.type() == Index.IndexType.PRIMARY_KEY,
+                    "Paimon only supports primary key Index."));
+  }
 }
diff --git 
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/TestGravitinoPaimonTable.java
 
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/TestGravitinoPaimonTable.java
index e4dc306ae..a34c27957 100644
--- 
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/TestGravitinoPaimonTable.java
+++ 
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/TestGravitinoPaimonTable.java
@@ -19,9 +19,12 @@
 package org.apache.gravitino.catalog.lakehouse.paimon;
 
 import static 
org.apache.gravitino.catalog.lakehouse.paimon.GravitinoPaimonColumn.fromPaimonColumn;
+import static 
org.apache.gravitino.catalog.lakehouse.paimon.GravitinoPaimonTable.PAIMON_PRIMARY_KEY_INDEX_NAME;
 import static 
org.apache.gravitino.catalog.lakehouse.paimon.TestPaimonCatalog.PAIMON_PROPERTIES_METADATA;
 import static 
org.apache.gravitino.catalog.lakehouse.paimon.utils.TableOpsUtils.checkColumnCapability;
+import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
 import static 
org.apache.gravitino.rel.expressions.transforms.Transforms.identity;
+import static org.apache.gravitino.rel.indexes.Indexes.primary;
 
 import com.google.common.collect.Maps;
 import java.io.File;
@@ -29,6 +32,7 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.time.Instant;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -50,6 +54,7 @@ import org.apache.gravitino.rel.expressions.NamedReference;
 import org.apache.gravitino.rel.expressions.distributions.Distributions;
 import org.apache.gravitino.rel.expressions.sorts.SortOrder;
 import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.indexes.Index;
 import org.apache.gravitino.rel.types.Types;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.types.DataField;
@@ -263,6 +268,73 @@ public class TestGravitinoPaimonTable {
     Assertions.assertArrayEquals(partitionKeys, loadedPartitionKeys);
   }
 
+  @Test
+  void testCreatePaimonPrimaryKeyTable() {
+    String paimonTableName = "test_paimon_primary_key_table";
+    NameIdentifier tableIdentifier = NameIdentifier.of(paimonSchema.name(), 
paimonTableName);
+    Map<String, String> properties = Maps.newHashMap();
+
+    GravitinoPaimonColumn col1 =
+        fromPaimonColumn(new DataField(0, "col_1", DataTypes.INT().notNull(), 
PAIMON_COMMENT));
+    GravitinoPaimonColumn col2 =
+        fromPaimonColumn(new DataField(1, "col_2", 
DataTypes.STRING().notNull(), PAIMON_COMMENT));
+    GravitinoPaimonColumn col3 =
+        fromPaimonColumn(new DataField(2, "col_3", 
DataTypes.STRING().nullable(), PAIMON_COMMENT));
+    Column[] columns = new Column[] {col1, col2, col3};
+
+    Transform[] transforms = new Transform[] {identity("col_1")};
+    String[] partitionKeys = new String[] {"col_1"};
+    Index[] indexes =
+        Collections.singletonList(
+                primary(PAIMON_PRIMARY_KEY_INDEX_NAME, new String[][] {new 
String[] {"col_2"}}))
+            .toArray(new Index[0]);
+
+    Table table =
+        paimonCatalogOperations.createTable(
+            tableIdentifier,
+            columns,
+            PAIMON_COMMENT,
+            properties,
+            transforms,
+            Distributions.NONE,
+            new SortOrder[0],
+            indexes);
+
+    Assertions.assertEquals(tableIdentifier.name(), table.name());
+    Assertions.assertEquals(PAIMON_COMMENT, table.comment());
+    Assertions.assertEquals(properties, table.properties());
+    Assertions.assertArrayEquals(transforms, table.partitioning());
+    Assertions.assertEquals(indexes.length, table.index().length);
+    for (int i = 0; i < indexes.length; i++) {
+      Assertions.assertEquals(indexes[i].name(), table.index()[i].name());
+      Assertions.assertEquals(indexes[i].type(), table.index()[i].type());
+      Assertions.assertEquals(indexes[i].fieldNames(), 
table.index()[i].fieldNames());
+    }
+
+    Table loadedTable = paimonCatalogOperations.loadTable(tableIdentifier);
+    Assertions.assertArrayEquals(transforms, loadedTable.partitioning());
+    String[] loadedPartitionKeys =
+        Arrays.stream(loadedTable.partitioning())
+            .map(
+                transform -> {
+                  NamedReference[] references = transform.references();
+                  Assertions.assertTrue(
+                      references.length == 1
+                          && references[0] instanceof 
NamedReference.FieldReference);
+                  NamedReference.FieldReference fieldReference =
+                      (NamedReference.FieldReference) references[0];
+                  return fieldReference.fieldName()[0];
+                })
+            .toArray(String[]::new);
+    Assertions.assertArrayEquals(partitionKeys, loadedPartitionKeys);
+    Assertions.assertEquals(indexes.length, loadedTable.index().length);
+    for (int i = 0; i < indexes.length; i++) {
+      Assertions.assertEquals(indexes[i].name(), 
loadedTable.index()[i].name());
+      Assertions.assertEquals(indexes[i].type(), 
loadedTable.index()[i].type());
+      Assertions.assertArrayEquals(indexes[i].fieldNames(), 
loadedTable.index()[i].fieldNames());
+    }
+  }
+
   @Test
   void testDropPaimonTable() {
     NameIdentifier tableIdentifier = NameIdentifier.of(paimonSchema.name(), 
genRandomName());
@@ -389,7 +461,7 @@ public class TestGravitinoPaimonTable {
     Assertions.assertEquals(new IntType().nullable(), 
paimonTableSchema.fields().get(0).type());
     Assertions.assertEquals(new DateType().nullable(), 
paimonTableSchema.fields().get(1).type());
     Assertions.assertEquals(
-        new VarCharType(Integer.MAX_VALUE).nullable(), 
paimonTableSchema.fields().get(2).type());
+        new VarCharType(Integer.MAX_VALUE).notNull(), 
paimonTableSchema.fields().get(2).type());
   }
 
   @Test
@@ -444,7 +516,68 @@ public class TestGravitinoPaimonTable {
     Assertions.assertEquals(new IntType().nullable(), 
paimonTableSchema.fields().get(0).type());
     Assertions.assertEquals(new DateType().nullable(), 
paimonTableSchema.fields().get(1).type());
     Assertions.assertEquals(
-        new VarCharType(Integer.MAX_VALUE).nullable(), 
paimonTableSchema.fields().get(2).type());
+        new VarCharType(Integer.MAX_VALUE).notNull(), 
paimonTableSchema.fields().get(2).type());
+  }
+
+  @Test
+  public void testGravitinoToPaimonTableWithPrimaryKey() {
+    Column[] columns = createColumns();
+    NameIdentifier identifier = NameIdentifier.of("test_schema", 
"test_primary_key_table");
+    Map<String, String> properties = Maps.newHashMap();
+
+    Transform[] partitions = new Transform[] {identity(columns[0].name())};
+    List<String> partitionKeys = Collections.singletonList(columns[0].name());
+    List<String> primaryKeys = Collections.singletonList(columns[2].name());
+    Index[] indexes =
+        Collections.singletonList(
+                primary(
+                    PAIMON_PRIMARY_KEY_INDEX_NAME,
+                    new String[][] {new String[] {columns[2].name()}}))
+            .toArray(new Index[0]);
+
+    GravitinoPaimonTable gravitinoPaimonTable =
+        GravitinoPaimonTable.builder()
+            .withName(identifier.name())
+            .withColumns(
+                Arrays.stream(columns)
+                    .map(
+                        column -> {
+                          checkColumnCapability(
+                              column.name(), column.defaultValue(), 
column.autoIncrement());
+                          return GravitinoPaimonColumn.builder()
+                              .withName(column.name())
+                              .withType(column.dataType())
+                              .withComment(column.comment())
+                              .withNullable(column.nullable())
+                              .withAutoIncrement(column.autoIncrement())
+                              .withDefaultValue(column.defaultValue())
+                              .build();
+                        })
+                    .toArray(GravitinoPaimonColumn[]::new))
+            .withPartitioning(partitions)
+            .withComment("test_table_comment")
+            .withProperties(properties)
+            .withIndexes(indexes)
+            .build();
+    Schema paimonTableSchema = gravitinoPaimonTable.toPaimonTableSchema();
+    Assertions.assertArrayEquals(
+        partitionKeys.toArray(new String[0]),
+        paimonTableSchema.partitionKeys().toArray(new String[0]));
+    Assertions.assertEquals(
+        gravitinoPaimonTable.columns().length, 
paimonTableSchema.fields().size());
+    Assertions.assertEquals(3, paimonTableSchema.fields().size());
+    for (int i = 0; i < gravitinoPaimonTable.columns().length; i++) {
+      Column column = gravitinoPaimonTable.columns()[i];
+      DataField dataField = paimonTableSchema.fields().get(i);
+      Assertions.assertEquals(column.name(), dataField.name());
+      Assertions.assertEquals(column.comment(), dataField.description());
+    }
+    Assertions.assertEquals(new IntType().nullable(), 
paimonTableSchema.fields().get(0).type());
+    Assertions.assertEquals(new DateType().nullable(), 
paimonTableSchema.fields().get(1).type());
+    Assertions.assertEquals(
+        new VarCharType(Integer.MAX_VALUE).notNull(), 
paimonTableSchema.fields().get(2).type());
+    Assertions.assertArrayEquals(
+        primaryKeys.toArray(new String[0]), 
paimonTableSchema.primaryKeys().toArray(new String[0]));
   }
 
   private static String genRandomName() {
@@ -480,7 +613,9 @@ public class TestGravitinoPaimonTable {
   private static Column[] createColumns() {
     Column col1 = Column.of("col1", Types.IntegerType.get(), "col_1_comment");
     Column col2 = Column.of("col2", Types.DateType.get(), "col_2_comment");
-    Column col3 = Column.of("col3", Types.StringType.get(), "col_3_comment");
+    Column col3 =
+        Column.of(
+            "col3", Types.StringType.get(), "col_3_comment", false, false, 
DEFAULT_VALUE_NOT_SET);
     return new Column[] {col1, col2, col3};
   }
 }
diff --git 
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java
 
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java
index 087712108..cd0ca2c5b 100644
--- 
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java
+++ 
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java
@@ -18,7 +18,9 @@
  */
 package org.apache.gravitino.catalog.lakehouse.paimon.integration.test;
 
+import static 
org.apache.gravitino.catalog.lakehouse.paimon.GravitinoPaimonTable.PAIMON_PRIMARY_KEY_INDEX_NAME;
 import static 
org.apache.gravitino.rel.expressions.transforms.Transforms.identity;
+import static org.apache.gravitino.rel.indexes.Indexes.primary;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
@@ -60,6 +62,7 @@ import org.apache.gravitino.rel.expressions.sorts.SortOrder;
 import org.apache.gravitino.rel.expressions.sorts.SortOrders;
 import org.apache.gravitino.rel.expressions.transforms.Transform;
 import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
 import org.apache.gravitino.rel.types.Types;
 import org.apache.paimon.catalog.Catalog.DatabaseNotExistException;
 import org.apache.paimon.catalog.Identifier;
@@ -92,6 +95,7 @@ public abstract class CatalogPaimonBaseIT extends AbstractIT {
   private static final String PAIMON_COL_NAME2 = "paimon_col_name2";
   private static final String PAIMON_COL_NAME3 = "paimon_col_name3";
   private static final String PAIMON_COL_NAME4 = "paimon_col_name4";
+  private static final String PAIMON_COL_NAME5 = "paimon_col_name5";
   private String metalakeName = 
GravitinoITUtils.genRandomName("paimon_it_metalake");
   private String catalogName = 
GravitinoITUtils.genRandomName("paimon_it_catalog");
   private String schemaName = 
GravitinoITUtils.genRandomName("paimon_it_schema");
@@ -375,6 +379,115 @@ public abstract class CatalogPaimonBaseIT extends 
AbstractIT {
     Assertions.assertArrayEquals(partitionKeys, 
schema.partitionKeys().toArray(new String[0]));
   }
 
+  @Test
+  void testCreateAndLoadPaimonPrimaryKeyTable()
+      throws org.apache.paimon.catalog.Catalog.TableNotExistException {
+    // Create table from Gravitino API
+    Column[] columns = createColumns();
+    ArrayList<Column> newColumns = new ArrayList<>(Arrays.asList(columns));
+    Column col5 =
+        Column.of(
+            PAIMON_COL_NAME5,
+            Types.StringType.get(),
+            "col_5_comment",
+            false,
+            false,
+            Column.DEFAULT_VALUE_NOT_SET);
+    newColumns.add(col5);
+    columns = newColumns.toArray(new Column[0]);
+
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
+    Distribution distribution = Distributions.NONE;
+
+    Transform[] partitioning =
+        new Transform[] {identity(PAIMON_COL_NAME1), 
identity(PAIMON_COL_NAME3)};
+    String[] partitionKeys = new String[] {PAIMON_COL_NAME1, PAIMON_COL_NAME3};
+
+    String[] primaryKeys = new String[] {PAIMON_COL_NAME5};
+    Index[] indexes =
+        Collections.singletonList(
+                primary(
+                    PAIMON_PRIMARY_KEY_INDEX_NAME,
+                    new String[][] {new String[] {PAIMON_COL_NAME5}}))
+            .toArray(new Index[0]);
+
+    Map<String, String> properties = createProperties();
+
+    SortOrder[] sortOrders = SortOrders.NONE;
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    Table createdTable =
+        tableCatalog.createTable(
+            tableIdentifier,
+            columns,
+            table_comment,
+            properties,
+            partitioning,
+            distribution,
+            sortOrders,
+            indexes);
+    Assertions.assertEquals(createdTable.name(), tableName);
+    Assertions.assertEquals(createdTable.comment(), table_comment);
+    Assertions.assertArrayEquals(partitioning, createdTable.partitioning());
+    Assertions.assertEquals(indexes.length, createdTable.index().length);
+    for (int i = 0; i < indexes.length; i++) {
+      Assertions.assertEquals(indexes[i].name(), 
createdTable.index()[i].name());
+      Assertions.assertEquals(indexes[i].type(), 
createdTable.index()[i].type());
+      Assertions.assertArrayEquals(indexes[i].fieldNames(), 
createdTable.index()[i].fieldNames());
+    }
+    Assertions.assertEquals(createdTable.columns().length, columns.length);
+    for (int i = 0; i < columns.length; i++) {
+      Assertions.assertEquals(DTOConverters.toDTO(columns[i]), 
createdTable.columns()[i]);
+    }
+
+    Table loadTable = tableCatalog.loadTable(tableIdentifier);
+    Assertions.assertEquals(tableName, loadTable.name());
+    Assertions.assertEquals(table_comment, loadTable.comment());
+    Assertions.assertArrayEquals(partitioning, loadTable.partitioning());
+    String[] loadedPartitionKeys =
+        Arrays.stream(loadTable.partitioning())
+            .map(
+                transform -> {
+                  NamedReference[] references = transform.references();
+                  Assertions.assertTrue(
+                      references.length == 1
+                          && references[0] instanceof 
NamedReference.FieldReference);
+                  NamedReference.FieldReference fieldReference =
+                      (NamedReference.FieldReference) references[0];
+                  return fieldReference.fieldName()[0];
+                })
+            .toArray(String[]::new);
+    Assertions.assertArrayEquals(partitionKeys, loadedPartitionKeys);
+    Assertions.assertEquals(indexes.length, loadTable.index().length);
+    for (int i = 0; i < indexes.length; i++) {
+      Assertions.assertEquals(indexes[i].name(), loadTable.index()[i].name());
+      Assertions.assertEquals(indexes[i].type(), loadTable.index()[i].type());
+      Assertions.assertArrayEquals(indexes[i].fieldNames(), 
loadTable.index()[i].fieldNames());
+    }
+    Assertions.assertEquals(loadTable.columns().length, columns.length);
+    for (int i = 0; i < columns.length; i++) {
+      Assertions.assertEquals(DTOConverters.toDTO(columns[i]), 
loadTable.columns()[i]);
+    }
+
+    // catalog load check
+    org.apache.paimon.table.Table table =
+        paimonCatalog.getTable(Identifier.create(schemaName, tableName));
+    Assertions.assertEquals(tableName, table.name());
+    Assertions.assertTrue(table.comment().isPresent());
+    Assertions.assertEquals(table_comment, table.comment().get());
+    Assertions.assertArrayEquals(partitionKeys, 
table.partitionKeys().toArray(new String[0]));
+    Assertions.assertArrayEquals(primaryKeys, table.primaryKeys().toArray(new 
String[0]));
+    Assertions.assertInstanceOf(FileStoreTable.class, table);
+    FileStoreTable fileStoreTable = (FileStoreTable) table;
+
+    TableSchema schema = fileStoreTable.schema();
+    Assertions.assertEquals(schema.fields().size(), columns.length);
+    for (int i = 0; i < columns.length; i++) {
+      Assertions.assertEquals(columns[i].name(), schema.fieldNames().get(i));
+    }
+    Assertions.assertArrayEquals(partitionKeys, 
schema.partitionKeys().toArray(new String[0]));
+    Assertions.assertArrayEquals(primaryKeys, schema.primaryKeys().toArray(new 
String[0]));
+  }
+
   @Test
   void testCreateTableWithTimestampColumn()
       throws org.apache.paimon.catalog.Catalog.TableNotExistException {

Reply via email to