This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new 5f4d733b3 [#4291] feat(catalog-lakehouse-paimon): Support Paimon table
with primary keys in Gravitino (#4383)
5f4d733b3 is described below
commit 5f4d733b38572367a21bcb9db3bb948111dc6bf0
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Aug 6 10:26:48 2024 +0800
[#4291] feat(catalog-lakehouse-paimon): Support Paimon table with primary
keys in Gravitino (#4383)
### What changes were proposed in this pull request?
Support Paimon table with primary keys in Gravitino
### Why are the changes needed?
Fix: https://github.com/apache/gravitino/issues/4291
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
New UTs and ITs.
Co-authored-by: cai can <[email protected]>
Co-authored-by: caican <[email protected]>
---
.../lakehouse/paimon/GravitinoPaimonTable.java | 113 +++++++++++++++--
.../lakehouse/paimon/PaimonCatalogOperations.java | 25 +++-
.../lakehouse/paimon/TestGravitinoPaimonTable.java | 141 ++++++++++++++++++++-
.../integration/test/CatalogPaimonBaseIT.java | 113 +++++++++++++++++
4 files changed, 370 insertions(+), 22 deletions(-)
diff --git
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java
index 6df977548..2853abbbe 100644
---
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java
+++
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java
@@ -18,12 +18,19 @@
*/
package org.apache.gravitino.catalog.lakehouse.paimon;
+import static
org.apache.gravitino.catalog.lakehouse.paimon.PaimonTablePropertiesMetadata.COMMENT;
+import static
org.apache.gravitino.dto.rel.partitioning.Partitioning.EMPTY_PARTITIONING;
import static org.apache.gravitino.meta.AuditInfo.EMPTY;
+import static org.apache.gravitino.rel.indexes.Indexes.primary;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.ToString;
@@ -32,6 +39,7 @@ import org.apache.gravitino.connector.TableOperations;
import org.apache.gravitino.rel.expressions.NamedReference;
import org.apache.gravitino.rel.expressions.transforms.Transform;
import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.DataField;
@@ -44,6 +52,9 @@ import org.apache.paimon.types.DataField;
@Getter
public class GravitinoPaimonTable extends BaseTable {
+ @VisibleForTesting
+ public static final String PAIMON_PRIMARY_KEY_INDEX_NAME =
"PAIMON_PRIMARY_KEY_INDEX";
+
private GravitinoPaimonTable() {}
@Override
@@ -58,20 +69,23 @@ public class GravitinoPaimonTable extends BaseTable {
* @return The converted Paimon table.
*/
public Schema toPaimonTableSchema() {
- Schema.Builder builder =
Schema.newBuilder().comment(comment).options(properties);
- if (partitioning != null) {
- builder.partitionKeys(
- Arrays.stream(partitioning)
- .map(
- partition -> {
- NamedReference[] references = partition.references();
- Preconditions.checkArgument(
- references.length == 1,
- "Partitioning column must be single-column, like
'a'.");
- return references[0].toString();
- })
- .collect(Collectors.toList()));
+ Schema.Builder builder = Schema.newBuilder().comment(comment);
+ if (properties == null) {
+ properties = Maps.newHashMap();
+ }
+ if (partitioning == null) {
+ partitioning = EMPTY_PARTITIONING;
}
+
+ Map<String, String> normalizedProperties = new HashMap<>(properties);
+ normalizedProperties.remove(COMMENT);
+
+ List<String> partitionKeys = getPartitionKeys(partitioning);
+ List<String> primaryKeys = getPrimaryKeysFromIndexes(indexes);
+
+ validate(primaryKeys, partitionKeys);
+
+
builder.options(normalizedProperties).primaryKey(primaryKeys).partitionKeys(partitionKeys);
for (int index = 0; index < columns.length; index++) {
DataField dataField = GravitinoPaimonColumn.toPaimonColumn(index,
columns[index]);
builder.column(dataField.name(), dataField.type(),
dataField.description());
@@ -94,6 +108,7 @@ public class GravitinoPaimonTable extends BaseTable {
.withPartitioning(toGravitinoPartitioning(table.partitionKeys()))
.withComment(table.comment().orElse(null))
.withProperties(table.options())
+ .withIndexes(constructIndexesFromPrimaryKeys(table))
.withAuditInfo(EMPTY)
.build();
}
@@ -102,6 +117,77 @@ public class GravitinoPaimonTable extends BaseTable {
return
partitionKeys.stream().map(Transforms::identity).toArray(Transform[]::new);
}
+ private static List<String> getPartitionKeys(Transform[] partitioning) {
+ if (partitioning == null) {
+ return Collections.emptyList();
+ }
+
+ return Arrays.stream(partitioning)
+ .map(
+ partition -> {
+ NamedReference[] references = partition.references();
+ Preconditions.checkArgument(
+ references.length == 1, "Partitioning column must be
single-column, like 'a'.");
+ return references[0].toString();
+ })
+ .collect(Collectors.toList());
+ }
+
+ private List<String> getPrimaryKeysFromIndexes(Index[] indexes) {
+ if (indexes == null || indexes.length == 0) {
+ return Collections.emptyList();
+ }
+
+ Preconditions.checkArgument(
+ indexes.length == 1, "Paimon only supports no more than one Index.");
+
+ Index primaryKeyIndex = indexes[0];
+ Arrays.stream(primaryKeyIndex.fieldNames())
+ .forEach(
+ filedName ->
+ Preconditions.checkArgument(
+ filedName != null && filedName.length == 1,
+ "The primary key columns should not be nested."));
+
+ return Arrays.stream(primaryKeyIndex.fieldNames())
+ .map(fieldName -> fieldName[0])
+ .collect(Collectors.toList());
+ }
+
+ private static Index[] constructIndexesFromPrimaryKeys(Table table) {
+ Index[] indexes = new Index[0];
+ if (table.primaryKeys() != null && !table.primaryKeys().isEmpty()) {
+ String[][] filedNames = constructIndexFiledNames(table.primaryKeys());
+ indexes =
+ Collections.singletonList(primary(PAIMON_PRIMARY_KEY_INDEX_NAME,
filedNames))
+ .toArray(new Index[0]);
+ }
+ return indexes;
+ }
+
+ private static String[][] constructIndexFiledNames(List<String> primaryKeys)
{
+ return primaryKeys.stream()
+ .map(pk -> new String[] {pk})
+ .collect(Collectors.toList())
+ .toArray(new String[0][0]);
+ }
+
+ private static void validate(List<String> primaryKeys, List<String>
partitionKeys) {
+ if (!primaryKeys.isEmpty()) {
+ List<String> adjusted =
+ primaryKeys.stream()
+ .filter(pk -> !partitionKeys.contains(pk))
+ .collect(Collectors.toList());
+
+ Preconditions.checkState(
+ !adjusted.isEmpty(),
+ String.format(
+ "Paimon Table Primary key constraint %s should not be same with
partition fields %s,"
+ + " this will result in only one record in a partition.",
+ primaryKeys, partitionKeys));
+ }
+ }
+
/** A builder class for constructing {@link GravitinoPaimonTable} instance.
*/
public static class Builder extends BaseTableBuilder<Builder,
GravitinoPaimonTable> {
@@ -121,6 +207,7 @@ public class GravitinoPaimonTable extends BaseTable {
paimonTable.columns = columns;
paimonTable.partitioning = partitioning;
paimonTable.properties = properties == null ? Maps.newHashMap() :
Maps.newHashMap(properties);
+ paimonTable.indexes = indexes;
paimonTable.auditInfo = auditInfo;
return paimonTable;
}
diff --git
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java
index 57095e237..68a5ebcff 100644
---
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java
+++
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java
@@ -22,6 +22,7 @@ import static
org.apache.gravitino.catalog.lakehouse.paimon.GravitinoPaimonTable
import static
org.apache.gravitino.catalog.lakehouse.paimon.PaimonSchema.fromPaimonProperties;
import static org.apache.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX;
import static
org.apache.gravitino.rel.expressions.transforms.Transforms.EMPTY_TRANSFORM;
+import static org.apache.gravitino.rel.indexes.Indexes.EMPTY_INDEXES;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -325,6 +326,9 @@ public class PaimonCatalogOperations implements
CatalogOperations, SupportsSchem
if (partitioning == null) {
partitioning = EMPTY_TRANSFORM;
}
+ if (indexes == null) {
+ indexes = EMPTY_INDEXES;
+ }
Preconditions.checkArgument(
Arrays.stream(partitioning)
.allMatch(
@@ -334,15 +338,13 @@ public class PaimonCatalogOperations implements
CatalogOperations, SupportsSchem
&& references[0] instanceof
NamedReference.FieldReference;
}),
"Paimon partition columns should not be nested.");
- Preconditions.checkArgument(
- sortOrders == null || sortOrders.length == 0,
- "Sort orders are not supported for Paimon in Gravitino.");
- Preconditions.checkArgument(
- indexes == null || indexes.length == 0,
- "Indexes are not supported for Paimon in Gravitino.");
Preconditions.checkArgument(
distribution == null || distribution.strategy() ==
Distributions.NONE.strategy(),
"Distribution is not supported for Paimon in Gravitino now.");
+ Preconditions.checkArgument(
+ sortOrders == null || sortOrders.length == 0,
+ "Sort orders are not supported for Paimon in Gravitino.");
+ checkPaimonIndexes(indexes);
String currentUser = currentUser();
GravitinoPaimonTable createdTable =
GravitinoPaimonTable.builder()
@@ -366,6 +368,7 @@ public class PaimonCatalogOperations implements
CatalogOperations, SupportsSchem
.withPartitioning(partitioning)
.withComment(comment)
.withProperties(properties)
+ .withIndexes(indexes)
.withAuditInfo(
AuditInfo.builder().withCreator(currentUser).withCreateTime(Instant.now()).build())
.build();
@@ -457,4 +460,14 @@ public class PaimonCatalogOperations implements
CatalogOperations, SupportsSchem
String[] levels = identifier.namespace().levels();
return NameIdentifier.of(levels[levels.length - 1], identifier.name());
}
+
+ private void checkPaimonIndexes(Index[] indexes) {
+ Preconditions.checkArgument(indexes.length <= 1, "Paimon supports no more
than one Index.");
+ Arrays.stream(indexes)
+ .forEach(
+ index ->
+ Preconditions.checkArgument(
+ index.type() == Index.IndexType.PRIMARY_KEY,
+ "Paimon only supports primary key Index."));
+ }
}
diff --git
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/TestGravitinoPaimonTable.java
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/TestGravitinoPaimonTable.java
index e4dc306ae..a34c27957 100644
---
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/TestGravitinoPaimonTable.java
+++
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/TestGravitinoPaimonTable.java
@@ -19,9 +19,12 @@
package org.apache.gravitino.catalog.lakehouse.paimon;
import static
org.apache.gravitino.catalog.lakehouse.paimon.GravitinoPaimonColumn.fromPaimonColumn;
+import static
org.apache.gravitino.catalog.lakehouse.paimon.GravitinoPaimonTable.PAIMON_PRIMARY_KEY_INDEX_NAME;
import static
org.apache.gravitino.catalog.lakehouse.paimon.TestPaimonCatalog.PAIMON_PROPERTIES_METADATA;
import static
org.apache.gravitino.catalog.lakehouse.paimon.utils.TableOpsUtils.checkColumnCapability;
+import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
import static
org.apache.gravitino.rel.expressions.transforms.Transforms.identity;
+import static org.apache.gravitino.rel.indexes.Indexes.primary;
import com.google.common.collect.Maps;
import java.io.File;
@@ -29,6 +32,7 @@ import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -50,6 +54,7 @@ import org.apache.gravitino.rel.expressions.NamedReference;
import org.apache.gravitino.rel.expressions.distributions.Distributions;
import org.apache.gravitino.rel.expressions.sorts.SortOrder;
import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.indexes.Index;
import org.apache.gravitino.rel.types.Types;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataField;
@@ -263,6 +268,73 @@ public class TestGravitinoPaimonTable {
Assertions.assertArrayEquals(partitionKeys, loadedPartitionKeys);
}
+ @Test
+ void testCreatePaimonPrimaryKeyTable() {
+ String paimonTableName = "test_paimon_primary_key_table";
+ NameIdentifier tableIdentifier = NameIdentifier.of(paimonSchema.name(),
paimonTableName);
+ Map<String, String> properties = Maps.newHashMap();
+
+ GravitinoPaimonColumn col1 =
+ fromPaimonColumn(new DataField(0, "col_1", DataTypes.INT().notNull(),
PAIMON_COMMENT));
+ GravitinoPaimonColumn col2 =
+ fromPaimonColumn(new DataField(1, "col_2",
DataTypes.STRING().notNull(), PAIMON_COMMENT));
+ GravitinoPaimonColumn col3 =
+ fromPaimonColumn(new DataField(2, "col_3",
DataTypes.STRING().nullable(), PAIMON_COMMENT));
+ Column[] columns = new Column[] {col1, col2, col3};
+
+ Transform[] transforms = new Transform[] {identity("col_1")};
+ String[] partitionKeys = new String[] {"col_1"};
+ Index[] indexes =
+ Collections.singletonList(
+ primary(PAIMON_PRIMARY_KEY_INDEX_NAME, new String[][] {new
String[] {"col_2"}}))
+ .toArray(new Index[0]);
+
+ Table table =
+ paimonCatalogOperations.createTable(
+ tableIdentifier,
+ columns,
+ PAIMON_COMMENT,
+ properties,
+ transforms,
+ Distributions.NONE,
+ new SortOrder[0],
+ indexes);
+
+ Assertions.assertEquals(tableIdentifier.name(), table.name());
+ Assertions.assertEquals(PAIMON_COMMENT, table.comment());
+ Assertions.assertEquals(properties, table.properties());
+ Assertions.assertArrayEquals(transforms, table.partitioning());
+ Assertions.assertEquals(indexes.length, table.index().length);
+ for (int i = 0; i < indexes.length; i++) {
+ Assertions.assertEquals(indexes[i].name(), table.index()[i].name());
+ Assertions.assertEquals(indexes[i].type(), table.index()[i].type());
+ Assertions.assertEquals(indexes[i].fieldNames(),
table.index()[i].fieldNames());
+ }
+
+ Table loadedTable = paimonCatalogOperations.loadTable(tableIdentifier);
+ Assertions.assertArrayEquals(transforms, loadedTable.partitioning());
+ String[] loadedPartitionKeys =
+ Arrays.stream(loadedTable.partitioning())
+ .map(
+ transform -> {
+ NamedReference[] references = transform.references();
+ Assertions.assertTrue(
+ references.length == 1
+ && references[0] instanceof
NamedReference.FieldReference);
+ NamedReference.FieldReference fieldReference =
+ (NamedReference.FieldReference) references[0];
+ return fieldReference.fieldName()[0];
+ })
+ .toArray(String[]::new);
+ Assertions.assertArrayEquals(partitionKeys, loadedPartitionKeys);
+ Assertions.assertEquals(indexes.length, loadedTable.index().length);
+ for (int i = 0; i < indexes.length; i++) {
+ Assertions.assertEquals(indexes[i].name(),
loadedTable.index()[i].name());
+ Assertions.assertEquals(indexes[i].type(),
loadedTable.index()[i].type());
+ Assertions.assertArrayEquals(indexes[i].fieldNames(),
loadedTable.index()[i].fieldNames());
+ }
+ }
+
@Test
void testDropPaimonTable() {
NameIdentifier tableIdentifier = NameIdentifier.of(paimonSchema.name(),
genRandomName());
@@ -389,7 +461,7 @@ public class TestGravitinoPaimonTable {
Assertions.assertEquals(new IntType().nullable(),
paimonTableSchema.fields().get(0).type());
Assertions.assertEquals(new DateType().nullable(),
paimonTableSchema.fields().get(1).type());
Assertions.assertEquals(
- new VarCharType(Integer.MAX_VALUE).nullable(),
paimonTableSchema.fields().get(2).type());
+ new VarCharType(Integer.MAX_VALUE).notNull(),
paimonTableSchema.fields().get(2).type());
}
@Test
@@ -444,7 +516,68 @@ public class TestGravitinoPaimonTable {
Assertions.assertEquals(new IntType().nullable(),
paimonTableSchema.fields().get(0).type());
Assertions.assertEquals(new DateType().nullable(),
paimonTableSchema.fields().get(1).type());
Assertions.assertEquals(
- new VarCharType(Integer.MAX_VALUE).nullable(),
paimonTableSchema.fields().get(2).type());
+ new VarCharType(Integer.MAX_VALUE).notNull(),
paimonTableSchema.fields().get(2).type());
+ }
+
+ @Test
+ public void testGravitinoToPaimonTableWithPrimaryKey() {
+ Column[] columns = createColumns();
+ NameIdentifier identifier = NameIdentifier.of("test_schema",
"test_primary_key_table");
+ Map<String, String> properties = Maps.newHashMap();
+
+ Transform[] partitions = new Transform[] {identity(columns[0].name())};
+ List<String> partitionKeys = Collections.singletonList(columns[0].name());
+ List<String> primaryKeys = Collections.singletonList(columns[2].name());
+ Index[] indexes =
+ Collections.singletonList(
+ primary(
+ PAIMON_PRIMARY_KEY_INDEX_NAME,
+ new String[][] {new String[] {columns[2].name()}}))
+ .toArray(new Index[0]);
+
+ GravitinoPaimonTable gravitinoPaimonTable =
+ GravitinoPaimonTable.builder()
+ .withName(identifier.name())
+ .withColumns(
+ Arrays.stream(columns)
+ .map(
+ column -> {
+ checkColumnCapability(
+ column.name(), column.defaultValue(),
column.autoIncrement());
+ return GravitinoPaimonColumn.builder()
+ .withName(column.name())
+ .withType(column.dataType())
+ .withComment(column.comment())
+ .withNullable(column.nullable())
+ .withAutoIncrement(column.autoIncrement())
+ .withDefaultValue(column.defaultValue())
+ .build();
+ })
+ .toArray(GravitinoPaimonColumn[]::new))
+ .withPartitioning(partitions)
+ .withComment("test_table_comment")
+ .withProperties(properties)
+ .withIndexes(indexes)
+ .build();
+ Schema paimonTableSchema = gravitinoPaimonTable.toPaimonTableSchema();
+ Assertions.assertArrayEquals(
+ partitionKeys.toArray(new String[0]),
+ paimonTableSchema.partitionKeys().toArray(new String[0]));
+ Assertions.assertEquals(
+ gravitinoPaimonTable.columns().length,
paimonTableSchema.fields().size());
+ Assertions.assertEquals(3, paimonTableSchema.fields().size());
+ for (int i = 0; i < gravitinoPaimonTable.columns().length; i++) {
+ Column column = gravitinoPaimonTable.columns()[i];
+ DataField dataField = paimonTableSchema.fields().get(i);
+ Assertions.assertEquals(column.name(), dataField.name());
+ Assertions.assertEquals(column.comment(), dataField.description());
+ }
+ Assertions.assertEquals(new IntType().nullable(),
paimonTableSchema.fields().get(0).type());
+ Assertions.assertEquals(new DateType().nullable(),
paimonTableSchema.fields().get(1).type());
+ Assertions.assertEquals(
+ new VarCharType(Integer.MAX_VALUE).notNull(),
paimonTableSchema.fields().get(2).type());
+ Assertions.assertArrayEquals(
+ primaryKeys.toArray(new String[0]),
paimonTableSchema.primaryKeys().toArray(new String[0]));
}
private static String genRandomName() {
@@ -480,7 +613,9 @@ public class TestGravitinoPaimonTable {
private static Column[] createColumns() {
Column col1 = Column.of("col1", Types.IntegerType.get(), "col_1_comment");
Column col2 = Column.of("col2", Types.DateType.get(), "col_2_comment");
- Column col3 = Column.of("col3", Types.StringType.get(), "col_3_comment");
+ Column col3 =
+ Column.of(
+ "col3", Types.StringType.get(), "col_3_comment", false, false,
DEFAULT_VALUE_NOT_SET);
return new Column[] {col1, col2, col3};
}
}
diff --git
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java
index 087712108..cd0ca2c5b 100644
---
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java
+++
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java
@@ -18,7 +18,9 @@
*/
package org.apache.gravitino.catalog.lakehouse.paimon.integration.test;
+import static
org.apache.gravitino.catalog.lakehouse.paimon.GravitinoPaimonTable.PAIMON_PRIMARY_KEY_INDEX_NAME;
import static
org.apache.gravitino.rel.expressions.transforms.Transforms.identity;
+import static org.apache.gravitino.rel.indexes.Indexes.primary;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
@@ -60,6 +62,7 @@ import org.apache.gravitino.rel.expressions.sorts.SortOrder;
import org.apache.gravitino.rel.expressions.sorts.SortOrders;
import org.apache.gravitino.rel.expressions.transforms.Transform;
import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
import org.apache.gravitino.rel.types.Types;
import org.apache.paimon.catalog.Catalog.DatabaseNotExistException;
import org.apache.paimon.catalog.Identifier;
@@ -92,6 +95,7 @@ public abstract class CatalogPaimonBaseIT extends AbstractIT {
private static final String PAIMON_COL_NAME2 = "paimon_col_name2";
private static final String PAIMON_COL_NAME3 = "paimon_col_name3";
private static final String PAIMON_COL_NAME4 = "paimon_col_name4";
+ private static final String PAIMON_COL_NAME5 = "paimon_col_name5";
private String metalakeName =
GravitinoITUtils.genRandomName("paimon_it_metalake");
private String catalogName =
GravitinoITUtils.genRandomName("paimon_it_catalog");
private String schemaName =
GravitinoITUtils.genRandomName("paimon_it_schema");
@@ -375,6 +379,115 @@ public abstract class CatalogPaimonBaseIT extends
AbstractIT {
Assertions.assertArrayEquals(partitionKeys,
schema.partitionKeys().toArray(new String[0]));
}
+ @Test
+ void testCreateAndLoadPaimonPrimaryKeyTable()
+ throws org.apache.paimon.catalog.Catalog.TableNotExistException {
+ // Create table from Gravitino API
+ Column[] columns = createColumns();
+ ArrayList<Column> newColumns = new ArrayList<>(Arrays.asList(columns));
+ Column col5 =
+ Column.of(
+ PAIMON_COL_NAME5,
+ Types.StringType.get(),
+ "col_5_comment",
+ false,
+ false,
+ Column.DEFAULT_VALUE_NOT_SET);
+ newColumns.add(col5);
+ columns = newColumns.toArray(new Column[0]);
+
+ NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
+ Distribution distribution = Distributions.NONE;
+
+ Transform[] partitioning =
+ new Transform[] {identity(PAIMON_COL_NAME1),
identity(PAIMON_COL_NAME3)};
+ String[] partitionKeys = new String[] {PAIMON_COL_NAME1, PAIMON_COL_NAME3};
+
+ String[] primaryKeys = new String[] {PAIMON_COL_NAME5};
+ Index[] indexes =
+ Collections.singletonList(
+ primary(
+ PAIMON_PRIMARY_KEY_INDEX_NAME,
+ new String[][] {new String[] {PAIMON_COL_NAME5}}))
+ .toArray(new Index[0]);
+
+ Map<String, String> properties = createProperties();
+
+ SortOrder[] sortOrders = SortOrders.NONE;
+ TableCatalog tableCatalog = catalog.asTableCatalog();
+ Table createdTable =
+ tableCatalog.createTable(
+ tableIdentifier,
+ columns,
+ table_comment,
+ properties,
+ partitioning,
+ distribution,
+ sortOrders,
+ indexes);
+ Assertions.assertEquals(createdTable.name(), tableName);
+ Assertions.assertEquals(createdTable.comment(), table_comment);
+ Assertions.assertArrayEquals(partitioning, createdTable.partitioning());
+ Assertions.assertEquals(indexes.length, createdTable.index().length);
+ for (int i = 0; i < indexes.length; i++) {
+ Assertions.assertEquals(indexes[i].name(),
createdTable.index()[i].name());
+ Assertions.assertEquals(indexes[i].type(),
createdTable.index()[i].type());
+ Assertions.assertArrayEquals(indexes[i].fieldNames(),
createdTable.index()[i].fieldNames());
+ }
+ Assertions.assertEquals(createdTable.columns().length, columns.length);
+ for (int i = 0; i < columns.length; i++) {
+ Assertions.assertEquals(DTOConverters.toDTO(columns[i]),
createdTable.columns()[i]);
+ }
+
+ Table loadTable = tableCatalog.loadTable(tableIdentifier);
+ Assertions.assertEquals(tableName, loadTable.name());
+ Assertions.assertEquals(table_comment, loadTable.comment());
+ Assertions.assertArrayEquals(partitioning, loadTable.partitioning());
+ String[] loadedPartitionKeys =
+ Arrays.stream(loadTable.partitioning())
+ .map(
+ transform -> {
+ NamedReference[] references = transform.references();
+ Assertions.assertTrue(
+ references.length == 1
+ && references[0] instanceof
NamedReference.FieldReference);
+ NamedReference.FieldReference fieldReference =
+ (NamedReference.FieldReference) references[0];
+ return fieldReference.fieldName()[0];
+ })
+ .toArray(String[]::new);
+ Assertions.assertArrayEquals(partitionKeys, loadedPartitionKeys);
+ Assertions.assertEquals(indexes.length, loadTable.index().length);
+ for (int i = 0; i < indexes.length; i++) {
+ Assertions.assertEquals(indexes[i].name(), loadTable.index()[i].name());
+ Assertions.assertEquals(indexes[i].type(), loadTable.index()[i].type());
+ Assertions.assertArrayEquals(indexes[i].fieldNames(),
loadTable.index()[i].fieldNames());
+ }
+ Assertions.assertEquals(loadTable.columns().length, columns.length);
+ for (int i = 0; i < columns.length; i++) {
+ Assertions.assertEquals(DTOConverters.toDTO(columns[i]),
loadTable.columns()[i]);
+ }
+
+ // catalog load check
+ org.apache.paimon.table.Table table =
+ paimonCatalog.getTable(Identifier.create(schemaName, tableName));
+ Assertions.assertEquals(tableName, table.name());
+ Assertions.assertTrue(table.comment().isPresent());
+ Assertions.assertEquals(table_comment, table.comment().get());
+ Assertions.assertArrayEquals(partitionKeys,
table.partitionKeys().toArray(new String[0]));
+ Assertions.assertArrayEquals(primaryKeys, table.primaryKeys().toArray(new
String[0]));
+ Assertions.assertInstanceOf(FileStoreTable.class, table);
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+
+ TableSchema schema = fileStoreTable.schema();
+ Assertions.assertEquals(schema.fields().size(), columns.length);
+ for (int i = 0; i < columns.length; i++) {
+ Assertions.assertEquals(columns[i].name(), schema.fieldNames().get(i));
+ }
+ Assertions.assertArrayEquals(partitionKeys,
schema.partitionKeys().toArray(new String[0]));
+ Assertions.assertArrayEquals(primaryKeys, schema.primaryKeys().toArray(new
String[0]));
+ }
+
@Test
void testCreateTableWithTimestampColumn()
throws org.apache.paimon.catalog.Catalog.TableNotExistException {