This is an automated email from the ASF dual-hosted git repository.
fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 059cee580 [#4287] feat(catalog-lakehouse-paimon): Support Paimon table
with partitions in Gravitino (#4288)
059cee580 is described below
commit 059cee5804059df35d4492a08c4f9c44b3b0874f
Author: cai can <[email protected]>
AuthorDate: Wed Jul 31 09:57:43 2024 +0800
[#4287] feat(catalog-lakehouse-paimon): Support Paimon table with
partitions in Gravitino (#4288)
### What changes were proposed in this pull request?
Support Paimon table with partitions in Gravitino
### Why are the changes needed?
Fix: https://github.com/apache/gravitino/issues/4287
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New UTs and ITs.
---------
Co-authored-by: caican <[email protected]>
---
.../lakehouse/paimon/GravitinoPaimonTable.java | 26 +++++
.../lakehouse/paimon/PaimonCatalogOperations.java | 16 ++-
.../lakehouse/paimon/TestGravitinoPaimonTable.java | 117 +++++++++++++++++++++
.../integration/test/CatalogPaimonBaseIT.java | 92 ++++++++++++++++
4 files changed, 249 insertions(+), 2 deletions(-)
diff --git
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java
index 8efbc55a6..6df977548 100644
---
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java
+++
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java
@@ -20,11 +20,18 @@ package org.apache.gravitino.catalog.lakehouse.paimon;
import static org.apache.gravitino.meta.AuditInfo.EMPTY;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
import lombok.Getter;
import lombok.ToString;
import org.apache.gravitino.connector.BaseTable;
import org.apache.gravitino.connector.TableOperations;
+import org.apache.gravitino.rel.expressions.NamedReference;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.DataField;
@@ -52,6 +59,19 @@ public class GravitinoPaimonTable extends BaseTable {
*/
public Schema toPaimonTableSchema() {
Schema.Builder builder =
Schema.newBuilder().comment(comment).options(properties);
+ if (partitioning != null) {
+ builder.partitionKeys(
+ Arrays.stream(partitioning)
+ .map(
+ partition -> {
+ NamedReference[] references = partition.references();
+ Preconditions.checkArgument(
+ references.length == 1,
+ "Partitioning column must be single-column, like
'a'.");
+ return references[0].toString();
+ })
+ .collect(Collectors.toList()));
+ }
for (int index = 0; index < columns.length; index++) {
DataField dataField = GravitinoPaimonColumn.toPaimonColumn(index,
columns[index]);
builder.column(dataField.name(), dataField.type(),
dataField.description());
@@ -71,12 +91,17 @@ public class GravitinoPaimonTable extends BaseTable {
.withColumns(
GravitinoPaimonColumn.fromPaimonRowType(table.rowType())
.toArray(new GravitinoPaimonColumn[0]))
+ .withPartitioning(toGravitinoPartitioning(table.partitionKeys()))
.withComment(table.comment().orElse(null))
.withProperties(table.options())
.withAuditInfo(EMPTY)
.build();
}
+ public static Transform[] toGravitinoPartitioning(List<String>
partitionKeys) {
+ return
partitionKeys.stream().map(Transforms::identity).toArray(Transform[]::new);
+ }
+
/** A builder class for constructing {@link GravitinoPaimonTable} instance.
*/
public static class Builder extends BaseTableBuilder<Builder,
GravitinoPaimonTable> {
@@ -94,6 +119,7 @@ public class GravitinoPaimonTable extends BaseTable {
paimonTable.name = name;
paimonTable.comment = comment;
paimonTable.columns = columns;
+ paimonTable.partitioning = partitioning;
paimonTable.properties = properties == null ? Maps.newHashMap() :
Maps.newHashMap(properties);
paimonTable.auditInfo = auditInfo;
return paimonTable;
diff --git
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java
index 225c8e017..57095e237 100644
---
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java
+++
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.catalog.lakehouse.paimon;
import static
org.apache.gravitino.catalog.lakehouse.paimon.GravitinoPaimonTable.fromPaimonTable;
import static
org.apache.gravitino.catalog.lakehouse.paimon.PaimonSchema.fromPaimonProperties;
import static org.apache.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX;
+import static
org.apache.gravitino.rel.expressions.transforms.Transforms.EMPTY_TRANSFORM;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -50,6 +51,7 @@ import org.apache.gravitino.meta.AuditInfo;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.TableCatalog;
import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.NamedReference;
import org.apache.gravitino.rel.expressions.distributions.Distribution;
import org.apache.gravitino.rel.expressions.distributions.Distributions;
import org.apache.gravitino.rel.expressions.sorts.SortOrder;
@@ -320,9 +322,18 @@ public class PaimonCatalogOperations implements
CatalogOperations, SupportsSchem
if (!schemaExists(schemaIdentifier)) {
throw new NoSuchSchemaException(NO_SUCH_SCHEMA_EXCEPTION,
schemaIdentifier);
}
+ if (partitioning == null) {
+ partitioning = EMPTY_TRANSFORM;
+ }
Preconditions.checkArgument(
- partitioning == null || partitioning.length == 0,
- "Table Partitions are not supported when creating a Paimon table in
Gravitino now.");
+ Arrays.stream(partitioning)
+ .allMatch(
+ partition -> {
+ NamedReference[] references = partition.references();
+ return references.length == 1
+ && references[0] instanceof
NamedReference.FieldReference;
+ }),
+ "Paimon partition columns should not be nested.");
Preconditions.checkArgument(
sortOrders == null || sortOrders.length == 0,
"Sort orders are not supported for Paimon in Gravitino.");
@@ -352,6 +363,7 @@ public class PaimonCatalogOperations implements
CatalogOperations, SupportsSchem
.build();
})
.toArray(GravitinoPaimonColumn[]::new))
+ .withPartitioning(partitioning)
.withComment(comment)
.withProperties(properties)
.withAuditInfo(
diff --git
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/TestGravitinoPaimonTable.java
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/TestGravitinoPaimonTable.java
index 3c9297085..e4dc306ae 100644
---
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/TestGravitinoPaimonTable.java
+++
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/TestGravitinoPaimonTable.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.catalog.lakehouse.paimon;
import static
org.apache.gravitino.catalog.lakehouse.paimon.GravitinoPaimonColumn.fromPaimonColumn;
import static
org.apache.gravitino.catalog.lakehouse.paimon.TestPaimonCatalog.PAIMON_PROPERTIES_METADATA;
import static
org.apache.gravitino.catalog.lakehouse.paimon.utils.TableOpsUtils.checkColumnCapability;
+import static
org.apache.gravitino.rel.expressions.transforms.Transforms.identity;
import com.google.common.collect.Maps;
import java.io.File;
@@ -29,6 +30,7 @@ import java.nio.file.Paths;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
@@ -44,6 +46,7 @@ import org.apache.gravitino.meta.CatalogEntity;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.rel.TableCatalog;
+import org.apache.gravitino.rel.expressions.NamedReference;
import org.apache.gravitino.rel.expressions.distributions.Distributions;
import org.apache.gravitino.rel.expressions.sorts.SortOrder;
import org.apache.gravitino.rel.expressions.transforms.Transform;
@@ -201,6 +204,65 @@ public class TestGravitinoPaimonTable {
.contains(String.format("Paimon table %s already exists",
tableIdentifier)));
}
+ @Test
+ void testCreatePaimonPartitionedTable() {
+ String paimonTableName = "test_paimon_partitioned_table";
+ NameIdentifier tableIdentifier = NameIdentifier.of(paimonSchema.name(),
paimonTableName);
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("key1", "val1");
+ properties.put("key2", "val2");
+
+ GravitinoPaimonColumn col1 =
+ fromPaimonColumn(new DataField(0, "col_1", DataTypes.INT().nullable(),
PAIMON_COMMENT));
+ GravitinoPaimonColumn col2 =
+ fromPaimonColumn(new DataField(1, "col_2", DataTypes.DATE().notNull(),
PAIMON_COMMENT));
+ GravitinoPaimonColumn col3 =
+ fromPaimonColumn(new DataField(2, "col_3",
DataTypes.STRING().notNull(), PAIMON_COMMENT));
+ Column[] columns = new Column[] {col1, col2, col3};
+
+ Transform[] transforms = new Transform[] {identity("col_1"),
identity("col_2")};
+ String[] partitionKeys = new String[] {"col_1", "col_2"};
+
+ Table table =
+ paimonCatalogOperations.createTable(
+ tableIdentifier,
+ columns,
+ PAIMON_COMMENT,
+ properties,
+ transforms,
+ Distributions.NONE,
+ new SortOrder[0]);
+
+ Assertions.assertEquals(tableIdentifier.name(), table.name());
+ Assertions.assertEquals(PAIMON_COMMENT, table.comment());
+ Assertions.assertEquals("val1", table.properties().get("key1"));
+ Assertions.assertEquals("val2", table.properties().get("key2"));
+ Assertions.assertArrayEquals(transforms, table.partitioning());
+
+ Table loadedTable = paimonCatalogOperations.loadTable(tableIdentifier);
+
+ Assertions.assertEquals("val1", loadedTable.properties().get("key1"));
+ Assertions.assertEquals("val2", loadedTable.properties().get("key2"));
+ Assertions.assertTrue(loadedTable.columns()[0].nullable());
+ Assertions.assertFalse(loadedTable.columns()[1].nullable());
+ Assertions.assertFalse(loadedTable.columns()[2].nullable());
+ Assertions.assertArrayEquals(transforms, loadedTable.partitioning());
+ String[] loadedPartitionKeys =
+ Arrays.stream(loadedTable.partitioning())
+ .map(
+ transform -> {
+ NamedReference[] references = transform.references();
+ Assertions.assertTrue(
+ references.length == 1
+ && references[0] instanceof
NamedReference.FieldReference);
+ NamedReference.FieldReference fieldReference =
+ (NamedReference.FieldReference) references[0];
+ return fieldReference.fieldName()[0];
+ })
+ .toArray(String[]::new);
+ Assertions.assertArrayEquals(partitionKeys, loadedPartitionKeys);
+ }
+
@Test
void testDropPaimonTable() {
NameIdentifier tableIdentifier = NameIdentifier.of(paimonSchema.name(),
genRandomName());
@@ -330,6 +392,61 @@ public class TestGravitinoPaimonTable {
new VarCharType(Integer.MAX_VALUE).nullable(),
paimonTableSchema.fields().get(2).type());
}
+ @Test
+ public void testGravitinoToPaimonTableWithPartitions() {
+ Column[] columns = createColumns();
+ NameIdentifier identifier = NameIdentifier.of("test_schema",
"test_partitioned_table");
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("key1", "val1");
+
+ Transform[] partitions =
+ new Transform[] {identity(columns[0].name()),
identity(columns[1].name())};
+ List<String> partitionKeys = Arrays.asList(columns[0].name(),
columns[1].name());
+
+ GravitinoPaimonTable gravitinoPaimonTable =
+ GravitinoPaimonTable.builder()
+ .withName(identifier.name())
+ .withColumns(
+ Arrays.stream(columns)
+ .map(
+ column -> {
+ checkColumnCapability(
+ column.name(), column.defaultValue(),
column.autoIncrement());
+ return GravitinoPaimonColumn.builder()
+ .withName(column.name())
+ .withType(column.dataType())
+ .withComment(column.comment())
+ .withNullable(column.nullable())
+ .withAutoIncrement(column.autoIncrement())
+ .withDefaultValue(column.defaultValue())
+ .build();
+ })
+ .toArray(GravitinoPaimonColumn[]::new))
+ .withPartitioning(partitions)
+ .withComment("test_table_comment")
+ .withProperties(properties)
+ .build();
+ Schema paimonTableSchema = gravitinoPaimonTable.toPaimonTableSchema();
+ Assertions.assertArrayEquals(
+ partitionKeys.toArray(new String[0]),
+ paimonTableSchema.partitionKeys().toArray(new String[0]));
+ Assertions.assertEquals(gravitinoPaimonTable.comment(),
gravitinoPaimonTable.comment());
+ Assertions.assertEquals(gravitinoPaimonTable.properties(),
paimonTableSchema.options());
+ Assertions.assertEquals(
+ gravitinoPaimonTable.columns().length,
paimonTableSchema.fields().size());
+ Assertions.assertEquals(3, paimonTableSchema.fields().size());
+ for (int i = 0; i < gravitinoPaimonTable.columns().length; i++) {
+ Column column = gravitinoPaimonTable.columns()[i];
+ DataField dataField = paimonTableSchema.fields().get(i);
+ Assertions.assertEquals(column.name(), dataField.name());
+ Assertions.assertEquals(column.comment(), dataField.description());
+ }
+ Assertions.assertEquals(new IntType().nullable(),
paimonTableSchema.fields().get(0).type());
+ Assertions.assertEquals(new DateType().nullable(),
paimonTableSchema.fields().get(1).type());
+ Assertions.assertEquals(
+ new VarCharType(Integer.MAX_VALUE).nullable(),
paimonTableSchema.fields().get(2).type());
+ }
+
private static String genRandomName() {
return UUID.randomUUID().toString().replace("-", "");
}
diff --git
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java
index 399042ef0..087712108 100644
---
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java
+++
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java
@@ -18,6 +18,8 @@
*/
package org.apache.gravitino.catalog.lakehouse.paimon.integration.test;
+import static
org.apache.gravitino.rel.expressions.transforms.Transforms.identity;
+
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.time.LocalDate;
@@ -51,6 +53,7 @@ import
org.apache.gravitino.integration.test.util.GravitinoITUtils;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.rel.TableCatalog;
+import org.apache.gravitino.rel.expressions.NamedReference;
import org.apache.gravitino.rel.expressions.distributions.Distribution;
import org.apache.gravitino.rel.expressions.distributions.Distributions;
import org.apache.gravitino.rel.expressions.sorts.SortOrder;
@@ -283,6 +286,95 @@ public abstract class CatalogPaimonBaseIT extends
AbstractIT {
sortOrders));
}
+ @Test
+ void testCreateAndLoadPaimonPartitionedTable()
+ throws org.apache.paimon.catalog.Catalog.TableNotExistException {
+ // Create table from Gravitino API
+ Column[] columns = createColumns();
+
+ NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
+ Distribution distribution = Distributions.NONE;
+
+ Transform[] partitioning =
+ new Transform[] {identity(PAIMON_COL_NAME1),
identity(PAIMON_COL_NAME3)};
+ String[] partitionKeys = new String[] {PAIMON_COL_NAME1, PAIMON_COL_NAME3};
+ SortOrder[] sortOrders = SortOrders.NONE;
+ Map<String, String> properties = createProperties();
+ TableCatalog tableCatalog = catalog.asTableCatalog();
+ Table createdTable =
+ tableCatalog.createTable(
+ tableIdentifier,
+ columns,
+ table_comment,
+ properties,
+ partitioning,
+ distribution,
+ sortOrders);
+ Assertions.assertEquals(createdTable.name(), tableName);
+ Map<String, String> resultProp = createdTable.properties();
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ Assertions.assertTrue(resultProp.containsKey(entry.getKey()));
+ Assertions.assertEquals(entry.getValue(),
resultProp.get(entry.getKey()));
+ }
+ Assertions.assertEquals(createdTable.comment(), table_comment);
+ Assertions.assertArrayEquals(partitioning, createdTable.partitioning());
+ Assertions.assertEquals(createdTable.columns().length, columns.length);
+
+ for (int i = 0; i < columns.length; i++) {
+ Assertions.assertEquals(DTOConverters.toDTO(columns[i]),
createdTable.columns()[i]);
+ }
+
+ Table loadTable = tableCatalog.loadTable(tableIdentifier);
+ Assertions.assertEquals(tableName, loadTable.name());
+ Assertions.assertEquals(table_comment, loadTable.comment());
+ resultProp = loadTable.properties();
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ Assertions.assertTrue(resultProp.containsKey(entry.getKey()));
+ Assertions.assertEquals(entry.getValue(),
resultProp.get(entry.getKey()));
+ }
+ Assertions.assertArrayEquals(partitioning, loadTable.partitioning());
+ String[] loadedPartitionKeys =
+ Arrays.stream(loadTable.partitioning())
+ .map(
+ transform -> {
+ NamedReference[] references = transform.references();
+ Assertions.assertTrue(
+ references.length == 1
+ && references[0] instanceof
NamedReference.FieldReference);
+ NamedReference.FieldReference fieldReference =
+ (NamedReference.FieldReference) references[0];
+ return fieldReference.fieldName()[0];
+ })
+ .toArray(String[]::new);
+ Assertions.assertArrayEquals(partitionKeys, loadedPartitionKeys);
+ Assertions.assertEquals(loadTable.columns().length, columns.length);
+ for (int i = 0; i < columns.length; i++) {
+ Assertions.assertEquals(DTOConverters.toDTO(columns[i]),
loadTable.columns()[i]);
+ }
+
+ // catalog load check
+ org.apache.paimon.table.Table table =
+ paimonCatalog.getTable(Identifier.create(schemaName, tableName));
+ Assertions.assertEquals(tableName, table.name());
+ Assertions.assertTrue(table.comment().isPresent());
+ Assertions.assertEquals(table_comment, table.comment().get());
+ resultProp = table.options();
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ Assertions.assertTrue(resultProp.containsKey(entry.getKey()));
+ Assertions.assertEquals(entry.getValue(),
resultProp.get(entry.getKey()));
+ }
+ Assertions.assertArrayEquals(partitionKeys,
table.partitionKeys().toArray(new String[0]));
+ Assertions.assertInstanceOf(FileStoreTable.class, table);
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+
+ TableSchema schema = fileStoreTable.schema();
+ Assertions.assertEquals(schema.fields().size(), columns.length);
+ for (int i = 0; i < columns.length; i++) {
+ Assertions.assertEquals(columns[i].name(), schema.fieldNames().get(i));
+ }
+ Assertions.assertArrayEquals(partitionKeys,
schema.partitionKeys().toArray(new String[0]));
+ }
+
@Test
void testCreateTableWithTimestampColumn()
throws org.apache.paimon.catalog.Catalog.TableNotExistException {