This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 059cee580 [#4287] feat(catalog-lakehouse-paimon): Support Paimon table 
with partitions in Gravitino (#4288)
059cee580 is described below

commit 059cee5804059df35d4492a08c4f9c44b3b0874f
Author: cai can <[email protected]>
AuthorDate: Wed Jul 31 09:57:43 2024 +0800

    [#4287] feat(catalog-lakehouse-paimon): Support Paimon table with 
partitions in Gravitino (#4288)
    
    ### What changes were proposed in this pull request?
    
    Support Paimon table with partitions in Gravitino
    
    ### Why are the changes needed?
    
    Fix: https://github.com/apache/gravitino/issues/4287
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    New UTs and ITs.
    
    ---------
    
    Co-authored-by: caican <[email protected]>
---
 .../lakehouse/paimon/GravitinoPaimonTable.java     |  26 +++++
 .../lakehouse/paimon/PaimonCatalogOperations.java  |  16 ++-
 .../lakehouse/paimon/TestGravitinoPaimonTable.java | 117 +++++++++++++++++++++
 .../integration/test/CatalogPaimonBaseIT.java      |  92 ++++++++++++++++
 4 files changed, 249 insertions(+), 2 deletions(-)

diff --git 
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java
 
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java
index 8efbc55a6..6df977548 100644
--- 
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java
+++ 
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java
@@ -20,11 +20,18 @@ package org.apache.gravitino.catalog.lakehouse.paimon;
 
 import static org.apache.gravitino.meta.AuditInfo.EMPTY;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
 import lombok.Getter;
 import lombok.ToString;
 import org.apache.gravitino.connector.BaseTable;
 import org.apache.gravitino.connector.TableOperations;
+import org.apache.gravitino.rel.expressions.NamedReference;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.types.DataField;
@@ -52,6 +59,19 @@ public class GravitinoPaimonTable extends BaseTable {
    */
   public Schema toPaimonTableSchema() {
     Schema.Builder builder = 
Schema.newBuilder().comment(comment).options(properties);
+    if (partitioning != null) {
+      builder.partitionKeys(
+          Arrays.stream(partitioning)
+              .map(
+                  partition -> {
+                    NamedReference[] references = partition.references();
+                    Preconditions.checkArgument(
+                        references.length == 1,
+                        "Partitioning column must be single-column, like 
'a'.");
+                    return references[0].toString();
+                  })
+              .collect(Collectors.toList()));
+    }
     for (int index = 0; index < columns.length; index++) {
       DataField dataField = GravitinoPaimonColumn.toPaimonColumn(index, 
columns[index]);
       builder.column(dataField.name(), dataField.type(), 
dataField.description());
@@ -71,12 +91,17 @@ public class GravitinoPaimonTable extends BaseTable {
         .withColumns(
             GravitinoPaimonColumn.fromPaimonRowType(table.rowType())
                 .toArray(new GravitinoPaimonColumn[0]))
+        .withPartitioning(toGravitinoPartitioning(table.partitionKeys()))
         .withComment(table.comment().orElse(null))
         .withProperties(table.options())
         .withAuditInfo(EMPTY)
         .build();
   }
 
+  public static Transform[] toGravitinoPartitioning(List<String> 
partitionKeys) {
+    return 
partitionKeys.stream().map(Transforms::identity).toArray(Transform[]::new);
+  }
+
   /** A builder class for constructing {@link GravitinoPaimonTable} instance. 
*/
   public static class Builder extends BaseTableBuilder<Builder, 
GravitinoPaimonTable> {
 
@@ -94,6 +119,7 @@ public class GravitinoPaimonTable extends BaseTable {
       paimonTable.name = name;
       paimonTable.comment = comment;
       paimonTable.columns = columns;
+      paimonTable.partitioning = partitioning;
       paimonTable.properties = properties == null ? Maps.newHashMap() : 
Maps.newHashMap(properties);
       paimonTable.auditInfo = auditInfo;
       return paimonTable;
diff --git 
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java
 
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java
index 225c8e017..57095e237 100644
--- 
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java
+++ 
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.catalog.lakehouse.paimon;
 import static 
org.apache.gravitino.catalog.lakehouse.paimon.GravitinoPaimonTable.fromPaimonTable;
 import static 
org.apache.gravitino.catalog.lakehouse.paimon.PaimonSchema.fromPaimonProperties;
 import static org.apache.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX;
+import static 
org.apache.gravitino.rel.expressions.transforms.Transforms.EMPTY_TRANSFORM;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -50,6 +51,7 @@ import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.rel.Column;
 import org.apache.gravitino.rel.TableCatalog;
 import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.NamedReference;
 import org.apache.gravitino.rel.expressions.distributions.Distribution;
 import org.apache.gravitino.rel.expressions.distributions.Distributions;
 import org.apache.gravitino.rel.expressions.sorts.SortOrder;
@@ -320,9 +322,18 @@ public class PaimonCatalogOperations implements 
CatalogOperations, SupportsSchem
     if (!schemaExists(schemaIdentifier)) {
       throw new NoSuchSchemaException(NO_SUCH_SCHEMA_EXCEPTION, 
schemaIdentifier);
     }
+    if (partitioning == null) {
+      partitioning = EMPTY_TRANSFORM;
+    }
     Preconditions.checkArgument(
-        partitioning == null || partitioning.length == 0,
-        "Table Partitions are not supported when creating a Paimon table in 
Gravitino now.");
+        Arrays.stream(partitioning)
+            .allMatch(
+                partition -> {
+                  NamedReference[] references = partition.references();
+                  return references.length == 1
+                      && references[0] instanceof 
NamedReference.FieldReference;
+                }),
+        "Paimon partition columns should not be nested.");
     Preconditions.checkArgument(
         sortOrders == null || sortOrders.length == 0,
         "Sort orders are not supported for Paimon in Gravitino.");
@@ -352,6 +363,7 @@ public class PaimonCatalogOperations implements 
CatalogOperations, SupportsSchem
                               .build();
                         })
                     .toArray(GravitinoPaimonColumn[]::new))
+            .withPartitioning(partitioning)
             .withComment(comment)
             .withProperties(properties)
             .withAuditInfo(
diff --git 
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/TestGravitinoPaimonTable.java
 
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/TestGravitinoPaimonTable.java
index 3c9297085..e4dc306ae 100644
--- 
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/TestGravitinoPaimonTable.java
+++ 
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/TestGravitinoPaimonTable.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.catalog.lakehouse.paimon;
 import static 
org.apache.gravitino.catalog.lakehouse.paimon.GravitinoPaimonColumn.fromPaimonColumn;
 import static 
org.apache.gravitino.catalog.lakehouse.paimon.TestPaimonCatalog.PAIMON_PROPERTIES_METADATA;
 import static 
org.apache.gravitino.catalog.lakehouse.paimon.utils.TableOpsUtils.checkColumnCapability;
+import static 
org.apache.gravitino.rel.expressions.transforms.Transforms.identity;
 
 import com.google.common.collect.Maps;
 import java.io.File;
@@ -29,6 +30,7 @@ import java.nio.file.Paths;
 import java.time.Instant;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.commons.io.FileUtils;
@@ -44,6 +46,7 @@ import org.apache.gravitino.meta.CatalogEntity;
 import org.apache.gravitino.rel.Column;
 import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.rel.TableCatalog;
+import org.apache.gravitino.rel.expressions.NamedReference;
 import org.apache.gravitino.rel.expressions.distributions.Distributions;
 import org.apache.gravitino.rel.expressions.sorts.SortOrder;
 import org.apache.gravitino.rel.expressions.transforms.Transform;
@@ -201,6 +204,65 @@ public class TestGravitinoPaimonTable {
             .contains(String.format("Paimon table %s already exists", 
tableIdentifier)));
   }
 
+  @Test
+  void testCreatePaimonPartitionedTable() {
+    String paimonTableName = "test_paimon_partitioned_table";
+    NameIdentifier tableIdentifier = NameIdentifier.of(paimonSchema.name(), 
paimonTableName);
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("key1", "val1");
+    properties.put("key2", "val2");
+
+    GravitinoPaimonColumn col1 =
+        fromPaimonColumn(new DataField(0, "col_1", DataTypes.INT().nullable(), 
PAIMON_COMMENT));
+    GravitinoPaimonColumn col2 =
+        fromPaimonColumn(new DataField(1, "col_2", DataTypes.DATE().notNull(), 
PAIMON_COMMENT));
+    GravitinoPaimonColumn col3 =
+        fromPaimonColumn(new DataField(2, "col_3", 
DataTypes.STRING().notNull(), PAIMON_COMMENT));
+    Column[] columns = new Column[] {col1, col2, col3};
+
+    Transform[] transforms = new Transform[] {identity("col_1"), 
identity("col_2")};
+    String[] partitionKeys = new String[] {"col_1", "col_2"};
+
+    Table table =
+        paimonCatalogOperations.createTable(
+            tableIdentifier,
+            columns,
+            PAIMON_COMMENT,
+            properties,
+            transforms,
+            Distributions.NONE,
+            new SortOrder[0]);
+
+    Assertions.assertEquals(tableIdentifier.name(), table.name());
+    Assertions.assertEquals(PAIMON_COMMENT, table.comment());
+    Assertions.assertEquals("val1", table.properties().get("key1"));
+    Assertions.assertEquals("val2", table.properties().get("key2"));
+    Assertions.assertArrayEquals(transforms, table.partitioning());
+
+    Table loadedTable = paimonCatalogOperations.loadTable(tableIdentifier);
+
+    Assertions.assertEquals("val1", loadedTable.properties().get("key1"));
+    Assertions.assertEquals("val2", loadedTable.properties().get("key2"));
+    Assertions.assertTrue(loadedTable.columns()[0].nullable());
+    Assertions.assertFalse(loadedTable.columns()[1].nullable());
+    Assertions.assertFalse(loadedTable.columns()[2].nullable());
+    Assertions.assertArrayEquals(transforms, loadedTable.partitioning());
+    String[] loadedPartitionKeys =
+        Arrays.stream(loadedTable.partitioning())
+            .map(
+                transform -> {
+                  NamedReference[] references = transform.references();
+                  Assertions.assertTrue(
+                      references.length == 1
+                          && references[0] instanceof 
NamedReference.FieldReference);
+                  NamedReference.FieldReference fieldReference =
+                      (NamedReference.FieldReference) references[0];
+                  return fieldReference.fieldName()[0];
+                })
+            .toArray(String[]::new);
+    Assertions.assertArrayEquals(partitionKeys, loadedPartitionKeys);
+  }
+
   @Test
   void testDropPaimonTable() {
     NameIdentifier tableIdentifier = NameIdentifier.of(paimonSchema.name(), 
genRandomName());
@@ -330,6 +392,61 @@ public class TestGravitinoPaimonTable {
         new VarCharType(Integer.MAX_VALUE).nullable(), 
paimonTableSchema.fields().get(2).type());
   }
 
+  @Test
+  public void testGravitinoToPaimonTableWithPartitions() {
+    Column[] columns = createColumns();
+    NameIdentifier identifier = NameIdentifier.of("test_schema", 
"test_partitioned_table");
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("key1", "val1");
+
+    Transform[] partitions =
+        new Transform[] {identity(columns[0].name()), 
identity(columns[1].name())};
+    List<String> partitionKeys = Arrays.asList(columns[0].name(), 
columns[1].name());
+
+    GravitinoPaimonTable gravitinoPaimonTable =
+        GravitinoPaimonTable.builder()
+            .withName(identifier.name())
+            .withColumns(
+                Arrays.stream(columns)
+                    .map(
+                        column -> {
+                          checkColumnCapability(
+                              column.name(), column.defaultValue(), 
column.autoIncrement());
+                          return GravitinoPaimonColumn.builder()
+                              .withName(column.name())
+                              .withType(column.dataType())
+                              .withComment(column.comment())
+                              .withNullable(column.nullable())
+                              .withAutoIncrement(column.autoIncrement())
+                              .withDefaultValue(column.defaultValue())
+                              .build();
+                        })
+                    .toArray(GravitinoPaimonColumn[]::new))
+            .withPartitioning(partitions)
+            .withComment("test_table_comment")
+            .withProperties(properties)
+            .build();
+    Schema paimonTableSchema = gravitinoPaimonTable.toPaimonTableSchema();
+    Assertions.assertArrayEquals(
+        partitionKeys.toArray(new String[0]),
+        paimonTableSchema.partitionKeys().toArray(new String[0]));
+    Assertions.assertEquals(gravitinoPaimonTable.comment(), 
gravitinoPaimonTable.comment());
+    Assertions.assertEquals(gravitinoPaimonTable.properties(), 
paimonTableSchema.options());
+    Assertions.assertEquals(
+        gravitinoPaimonTable.columns().length, 
paimonTableSchema.fields().size());
+    Assertions.assertEquals(3, paimonTableSchema.fields().size());
+    for (int i = 0; i < gravitinoPaimonTable.columns().length; i++) {
+      Column column = gravitinoPaimonTable.columns()[i];
+      DataField dataField = paimonTableSchema.fields().get(i);
+      Assertions.assertEquals(column.name(), dataField.name());
+      Assertions.assertEquals(column.comment(), dataField.description());
+    }
+    Assertions.assertEquals(new IntType().nullable(), 
paimonTableSchema.fields().get(0).type());
+    Assertions.assertEquals(new DateType().nullable(), 
paimonTableSchema.fields().get(1).type());
+    Assertions.assertEquals(
+        new VarCharType(Integer.MAX_VALUE).nullable(), 
paimonTableSchema.fields().get(2).type());
+  }
+
   private static String genRandomName() {
     return UUID.randomUUID().toString().replace("-", "");
   }
diff --git 
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java
 
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java
index 399042ef0..087712108 100644
--- 
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java
+++ 
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java
@@ -18,6 +18,8 @@
  */
 package org.apache.gravitino.catalog.lakehouse.paimon.integration.test;
 
+import static 
org.apache.gravitino.rel.expressions.transforms.Transforms.identity;
+
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import java.time.LocalDate;
@@ -51,6 +53,7 @@ import 
org.apache.gravitino.integration.test.util.GravitinoITUtils;
 import org.apache.gravitino.rel.Column;
 import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.rel.TableCatalog;
+import org.apache.gravitino.rel.expressions.NamedReference;
 import org.apache.gravitino.rel.expressions.distributions.Distribution;
 import org.apache.gravitino.rel.expressions.distributions.Distributions;
 import org.apache.gravitino.rel.expressions.sorts.SortOrder;
@@ -283,6 +286,95 @@ public abstract class CatalogPaimonBaseIT extends 
AbstractIT {
                     sortOrders));
   }
 
+  @Test
+  void testCreateAndLoadPaimonPartitionedTable()
+      throws org.apache.paimon.catalog.Catalog.TableNotExistException {
+    // Create table from Gravitino API
+    Column[] columns = createColumns();
+
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
+    Distribution distribution = Distributions.NONE;
+
+    Transform[] partitioning =
+        new Transform[] {identity(PAIMON_COL_NAME1), 
identity(PAIMON_COL_NAME3)};
+    String[] partitionKeys = new String[] {PAIMON_COL_NAME1, PAIMON_COL_NAME3};
+    SortOrder[] sortOrders = SortOrders.NONE;
+    Map<String, String> properties = createProperties();
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    Table createdTable =
+        tableCatalog.createTable(
+            tableIdentifier,
+            columns,
+            table_comment,
+            properties,
+            partitioning,
+            distribution,
+            sortOrders);
+    Assertions.assertEquals(createdTable.name(), tableName);
+    Map<String, String> resultProp = createdTable.properties();
+    for (Map.Entry<String, String> entry : properties.entrySet()) {
+      Assertions.assertTrue(resultProp.containsKey(entry.getKey()));
+      Assertions.assertEquals(entry.getValue(), 
resultProp.get(entry.getKey()));
+    }
+    Assertions.assertEquals(createdTable.comment(), table_comment);
+    Assertions.assertArrayEquals(partitioning, createdTable.partitioning());
+    Assertions.assertEquals(createdTable.columns().length, columns.length);
+
+    for (int i = 0; i < columns.length; i++) {
+      Assertions.assertEquals(DTOConverters.toDTO(columns[i]), 
createdTable.columns()[i]);
+    }
+
+    Table loadTable = tableCatalog.loadTable(tableIdentifier);
+    Assertions.assertEquals(tableName, loadTable.name());
+    Assertions.assertEquals(table_comment, loadTable.comment());
+    resultProp = loadTable.properties();
+    for (Map.Entry<String, String> entry : properties.entrySet()) {
+      Assertions.assertTrue(resultProp.containsKey(entry.getKey()));
+      Assertions.assertEquals(entry.getValue(), 
resultProp.get(entry.getKey()));
+    }
+    Assertions.assertArrayEquals(partitioning, loadTable.partitioning());
+    String[] loadedPartitionKeys =
+        Arrays.stream(loadTable.partitioning())
+            .map(
+                transform -> {
+                  NamedReference[] references = transform.references();
+                  Assertions.assertTrue(
+                      references.length == 1
+                          && references[0] instanceof 
NamedReference.FieldReference);
+                  NamedReference.FieldReference fieldReference =
+                      (NamedReference.FieldReference) references[0];
+                  return fieldReference.fieldName()[0];
+                })
+            .toArray(String[]::new);
+    Assertions.assertArrayEquals(partitionKeys, loadedPartitionKeys);
+    Assertions.assertEquals(loadTable.columns().length, columns.length);
+    for (int i = 0; i < columns.length; i++) {
+      Assertions.assertEquals(DTOConverters.toDTO(columns[i]), 
loadTable.columns()[i]);
+    }
+
+    // catalog load check
+    org.apache.paimon.table.Table table =
+        paimonCatalog.getTable(Identifier.create(schemaName, tableName));
+    Assertions.assertEquals(tableName, table.name());
+    Assertions.assertTrue(table.comment().isPresent());
+    Assertions.assertEquals(table_comment, table.comment().get());
+    resultProp = table.options();
+    for (Map.Entry<String, String> entry : properties.entrySet()) {
+      Assertions.assertTrue(resultProp.containsKey(entry.getKey()));
+      Assertions.assertEquals(entry.getValue(), 
resultProp.get(entry.getKey()));
+    }
+    Assertions.assertArrayEquals(partitionKeys, 
table.partitionKeys().toArray(new String[0]));
+    Assertions.assertInstanceOf(FileStoreTable.class, table);
+    FileStoreTable fileStoreTable = (FileStoreTable) table;
+
+    TableSchema schema = fileStoreTable.schema();
+    Assertions.assertEquals(schema.fields().size(), columns.length);
+    for (int i = 0; i < columns.length; i++) {
+      Assertions.assertEquals(columns[i].name(), schema.fieldNames().get(i));
+    }
+    Assertions.assertArrayEquals(partitionKeys, 
schema.partitionKeys().toArray(new String[0]));
+  }
+
   @Test
   void testCreateTableWithTimestampColumn()
       throws org.apache.paimon.catalog.Catalog.TableNotExistException {

Reply via email to