This is an automated email from the ASF dual-hosted git repository.
fanng pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new 9b243bf2c [#3916] feat(catalog-lakehouse-paimon): Support alter table
for Paimon Catalog (#4428)
9b243bf2c is described below
commit 9b243bf2cf4d25e4ec7984bd2cb150dab9874d79
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Aug 8 11:40:39 2024 +0800
[#3916] feat(catalog-lakehouse-paimon): Support alter table for Paimon
Catalog (#4428)
### What changes were proposed in this pull request?
Support alter table for Paimon Catalog.
### Why are the changes needed?
Fix: https://github.com/apache/gravitino/issues/3916
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New UTs and ITs.
Co-authored-by: cai can <[email protected]>
Co-authored-by: caican <[email protected]>
---
.../lakehouse/paimon/PaimonCatalogOperations.java | 78 ++++-
.../paimon/PaimonTablePropertiesMetadata.java | 16 +-
.../lakehouse/paimon/ops/PaimonCatalogOps.java | 15 +
.../lakehouse/paimon/utils/TableOpsUtils.java | 130 ++++++++
.../lakehouse/paimon/TestGravitinoPaimonTable.java | 8 +-
.../integration/test/CatalogPaimonBaseIT.java | 151 ++++++++++
.../lakehouse/paimon/ops/TestPaimonCatalogOps.java | 320 +++++++++++++++++---
.../lakehouse/paimon/utils/TestTableOpsUtils.java | 333 +++++++++++++++++++++
8 files changed, 1011 insertions(+), 40 deletions(-)
diff --git
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java
index 68a5ebcff..99487fe87 100644
---
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java
+++
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java
@@ -31,7 +31,10 @@ import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.SchemaChange;
@@ -43,6 +46,7 @@ import org.apache.gravitino.connector.HasPropertyMetadata;
import org.apache.gravitino.connector.SupportsSchemas;
import org.apache.gravitino.exceptions.ConnectionFailedException;
import org.apache.gravitino.exceptions.NoSuchCatalogException;
+import org.apache.gravitino.exceptions.NoSuchColumnException;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.exceptions.NoSuchTableException;
import org.apache.gravitino.exceptions.NonEmptySchemaException;
@@ -52,6 +56,7 @@ import org.apache.gravitino.meta.AuditInfo;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.TableCatalog;
import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.TableChange.RenameTable;
import org.apache.gravitino.rel.expressions.NamedReference;
import org.apache.gravitino.rel.expressions.distributions.Distribution;
import org.apache.gravitino.rel.expressions.distributions.Distributions;
@@ -84,6 +89,8 @@ public class PaimonCatalogOperations implements
CatalogOperations, SupportsSchem
"Paimon schema (database) %s already exists.";
private static final String NO_SUCH_TABLE_EXCEPTION = "Paimon table %s does
not exist.";
private static final String TABLE_ALREADY_EXISTS_EXCEPTION = "Paimon table
%s already exists.";
+ private static final String NO_SUCH_COLUMN_EXCEPTION =
+ "Paimon column of table %s does not exist.";
/**
* Initializes the Paimon catalog operations with the provided configuration.
@@ -402,7 +409,25 @@ public class PaimonCatalogOperations implements
CatalogOperations, SupportsSchem
@Override
public GravitinoPaimonTable alterTable(NameIdentifier identifier,
TableChange... changes)
throws NoSuchTableException, IllegalArgumentException {
- throw new UnsupportedOperationException("alterTable is unsupported now for
Paimon Catalog.");
+ Optional<TableChange> renameTableOpt =
+ Arrays.stream(changes)
+ .filter(tableChange -> tableChange instanceof RenameTable)
+ .reduce((a, b) -> b);
+ if (renameTableOpt.isPresent()) {
+ String otherChanges =
+ Arrays.stream(changes)
+ .filter(tableChange -> !(tableChange instanceof RenameTable))
+ .map(String::valueOf)
+ .collect(Collectors.joining("\n"));
+ Preconditions.checkArgument(
+ StringUtils.isEmpty(otherChanges),
+ String.format(
+ "The operation to change the table name cannot be performed
together with other operations. "
+ + "The list of operations that you cannot perform includes:
\n%s",
+ otherChanges));
+ return renameTable(identifier, (RenameTable) renameTableOpt.get());
+ }
+ return internalAlterTable(identifier, changes);
}
/**
@@ -470,4 +495,55 @@ public class PaimonCatalogOperations implements
CatalogOperations, SupportsSchem
index.type() == Index.IndexType.PRIMARY_KEY,
"Paimon only supports primary key Index."));
}
+
+ /**
+ * Performs rename table change with the provided identifier.
+ *
+ * @param identifier The identifier of the table to rename.
+ * @param renameTable Table Change to modify the table name.
+ * @return The renamed {@link GravitinoPaimonTable} instance.
+ * @throws NoSuchTableException If the table with the provided identifier
does not exist.
+ * @throws IllegalArgumentException This exception will not be thrown in
this method.
+ */
+ private GravitinoPaimonTable renameTable(
+ NameIdentifier identifier, TableChange.RenameTable renameTable)
+ throws NoSuchTableException, IllegalArgumentException {
+ NameIdentifier newNnameIdentifier =
+ NameIdentifier.of(identifier.namespace(), renameTable.getNewName());
+ NameIdentifier oldIdentifier = buildPaimonNameIdentifier(identifier);
+ NameIdentifier newIdentifier =
buildPaimonNameIdentifier(newNnameIdentifier);
+ try {
+ paimonCatalogOps.renameTable(oldIdentifier.toString(),
newIdentifier.toString());
+ } catch (Catalog.TableNotExistException e) {
+ throw new NoSuchTableException(e, NO_SUCH_TABLE_EXCEPTION,
oldIdentifier);
+ } catch (Catalog.TableAlreadyExistException e) {
+ throw new TableAlreadyExistsException(e, TABLE_ALREADY_EXISTS_EXCEPTION,
newIdentifier);
+ }
+ return loadTable(newNnameIdentifier);
+ }
+
+ /**
+ * Performs alter table changes with the provided identifier according to
the specified {@link
+ * TableChange} changes.
+ *
+ * @param identifier The identifier of the table to alter.
+ * @param changes The changes to apply to the table.
+ * @return The altered {@link GravitinoPaimonTable} instance.
+ * @throws NoSuchTableException If the table with the provided identifier
does not exist.
+ * @throws IllegalArgumentException This exception will not be thrown in
this method.
+ */
+ private GravitinoPaimonTable internalAlterTable(NameIdentifier identifier,
TableChange... changes)
+ throws NoSuchTableException, IllegalArgumentException {
+ NameIdentifier paimonNameIdentifier =
buildPaimonNameIdentifier(identifier);
+ try {
+ paimonCatalogOps.alterTable(paimonNameIdentifier.toString(), changes);
+ } catch (Catalog.TableNotExistException e) {
+ throw new NoSuchTableException(e, NO_SUCH_TABLE_EXCEPTION,
paimonNameIdentifier);
+ } catch (Catalog.ColumnNotExistException e) {
+ throw new NoSuchColumnException(e, NO_SUCH_COLUMN_EXCEPTION,
paimonNameIdentifier);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return loadTable(identifier);
+ }
}
diff --git
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonTablePropertiesMetadata.java
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonTablePropertiesMetadata.java
index 80145f29d..1c57e5b2c 100644
---
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonTablePropertiesMetadata.java
+++
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonTablePropertiesMetadata.java
@@ -35,7 +35,13 @@ import org.apache.gravitino.connector.PropertyEntry;
public class PaimonTablePropertiesMetadata extends BasePropertiesMetadata {
public static final String COMMENT = "comment";
- public static final String CREATOR = "creator";
+ public static final String OWNER = "owner";
+ public static final String BUCKET_KEY = "bucket-key";
+ public static final String MERGE_ENGINE = "merge-engine";
+ public static final String SEQUENCE_FIELD = "sequence.field";
+ public static final String ROWKIND_FIELD = "rowkind.field";
+ public static final String PRIMARY_KEY = "primary-key";
+ public static final String PARTITION = "partition";
private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;
@@ -43,7 +49,13 @@ public class PaimonTablePropertiesMetadata extends
BasePropertiesMetadata {
List<PropertyEntry<?>> propertyEntries =
ImmutableList.of(
stringReservedPropertyEntry(COMMENT, "The table comment", true),
- stringReservedPropertyEntry(CREATOR, "The table creator", false));
+ stringReservedPropertyEntry(OWNER, "The table owner", false),
+ stringReservedPropertyEntry(BUCKET_KEY, "The table bucket key",
false),
+ stringReservedPropertyEntry(MERGE_ENGINE, "The table merge
engine", false),
+ stringReservedPropertyEntry(SEQUENCE_FIELD, "The table sequence
field", false),
+ stringReservedPropertyEntry(ROWKIND_FIELD, "The table rowkind
field", false),
+ stringReservedPropertyEntry(PRIMARY_KEY, "The table primary key",
false),
+ stringReservedPropertyEntry(PARTITION, "The table partition",
false));
PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries,
PropertyEntry::getName);
}
diff --git
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/ops/PaimonCatalogOps.java
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/ops/PaimonCatalogOps.java
index ed5c2de71..d009c8bb6 100644
---
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/ops/PaimonCatalogOps.java
+++
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/ops/PaimonCatalogOps.java
@@ -19,15 +19,20 @@
package org.apache.gravitino.catalog.lakehouse.paimon.ops;
import static
org.apache.gravitino.catalog.lakehouse.paimon.utils.CatalogUtils.loadCatalogBackend;
+import static
org.apache.gravitino.catalog.lakehouse.paimon.utils.TableOpsUtils.buildSchemaChanges;
import com.google.common.base.Preconditions;
import java.util.List;
import java.util.Map;
import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConfig;
+import org.apache.gravitino.rel.TableChange;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Catalog.ColumnAlreadyExistException;
+import org.apache.paimon.catalog.Catalog.ColumnNotExistException;
import org.apache.paimon.catalog.Catalog.DatabaseAlreadyExistException;
import org.apache.paimon.catalog.Catalog.DatabaseNotEmptyException;
import org.apache.paimon.catalog.Catalog.DatabaseNotExistException;
+import org.apache.paimon.catalog.Catalog.TableAlreadyExistException;
import org.apache.paimon.catalog.Catalog.TableNotExistException;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.schema.Schema;
@@ -89,6 +94,16 @@ public class PaimonCatalogOps implements AutoCloseable {
catalog.dropTable(tableIdentifier(tableName), false);
}
+ public void alterTable(String tableName, TableChange... changes)
+ throws ColumnAlreadyExistException, TableNotExistException,
ColumnNotExistException {
+ catalog.alterTable(tableIdentifier(tableName),
buildSchemaChanges(changes), false);
+ }
+
+ public void renameTable(String fromTableName, String toTableName)
+ throws TableNotExistException, TableAlreadyExistException {
+ catalog.renameTable(tableIdentifier(fromTableName),
tableIdentifier(toTableName), false);
+ }
+
private Identifier tableIdentifier(String tableName) {
return Identifier.fromString(tableName);
}
diff --git
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/utils/TableOpsUtils.java
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/utils/TableOpsUtils.java
index 86587d10e..2d8543673 100644
---
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/utils/TableOpsUtils.java
+++
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/utils/TableOpsUtils.java
@@ -18,20 +18,115 @@
*/
package org.apache.gravitino.catalog.lakehouse.paimon.utils;
+import static
org.apache.gravitino.catalog.lakehouse.paimon.utils.TypeUtils.toPaimonType;
+import static org.apache.paimon.schema.SchemaChange.addColumn;
+import static org.apache.paimon.schema.SchemaChange.dropColumn;
+import static org.apache.paimon.schema.SchemaChange.removeOption;
+import static org.apache.paimon.schema.SchemaChange.renameColumn;
+import static org.apache.paimon.schema.SchemaChange.setOption;
+import static org.apache.paimon.schema.SchemaChange.updateColumnComment;
+import static org.apache.paimon.schema.SchemaChange.updateColumnNullability;
+import static org.apache.paimon.schema.SchemaChange.updateColumnPosition;
+import static org.apache.paimon.schema.SchemaChange.updateColumnType;
+import static org.apache.paimon.schema.SchemaChange.updateComment;
+
+import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.gravitino.catalog.lakehouse.paimon.ops.PaimonCatalogOps;
import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.TableChange.AddColumn;
+import org.apache.gravitino.rel.TableChange.After;
+import org.apache.gravitino.rel.TableChange.ColumnChange;
+import org.apache.gravitino.rel.TableChange.ColumnPosition;
+import org.apache.gravitino.rel.TableChange.Default;
+import org.apache.gravitino.rel.TableChange.DeleteColumn;
+import org.apache.gravitino.rel.TableChange.First;
+import org.apache.gravitino.rel.TableChange.RemoveProperty;
+import org.apache.gravitino.rel.TableChange.RenameColumn;
+import org.apache.gravitino.rel.TableChange.SetProperty;
+import org.apache.gravitino.rel.TableChange.UpdateColumnComment;
+import org.apache.gravitino.rel.TableChange.UpdateColumnNullability;
+import org.apache.gravitino.rel.TableChange.UpdateColumnPosition;
+import org.apache.gravitino.rel.TableChange.UpdateColumnType;
+import org.apache.gravitino.rel.TableChange.UpdateComment;
import org.apache.gravitino.rel.expressions.Expression;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaChange.Move;
/** Utilities of {@link PaimonCatalogOps} to support table operation. */
public class TableOpsUtils {
+ public static final Joiner DOT = Joiner.on(".");
+
public static void checkColumnCapability(
String fieldName, Expression defaultValue, boolean autoIncrement) {
checkColumnDefaultValue(fieldName, defaultValue);
checkColumnAutoIncrement(fieldName, autoIncrement);
}
+ public static List<SchemaChange> buildSchemaChanges(TableChange...
tableChanges)
+ throws UnsupportedOperationException {
+ List<SchemaChange> schemaChanges = new ArrayList<>();
+ for (TableChange tableChange : tableChanges) {
+ schemaChanges.add(buildSchemaChange(tableChange));
+ }
+ return schemaChanges;
+ }
+
+ public static SchemaChange buildSchemaChange(TableChange tableChange)
+ throws UnsupportedOperationException {
+ if (tableChange instanceof ColumnChange) {
+ if (tableChange instanceof AddColumn) {
+ AddColumn addColumn = (AddColumn) tableChange;
+ String fieldName = getfieldName(addColumn);
+ checkColumnCapability(fieldName, addColumn.getDefaultValue(),
addColumn.isAutoIncrement());
+ return addColumn(
+ fieldName,
+ toPaimonType(addColumn.getDataType()).copy(addColumn.isNullable()),
+ addColumn.getComment(),
+ move(fieldName, addColumn.getPosition()));
+ } else if (tableChange instanceof DeleteColumn) {
+ return dropColumn(getfieldName((DeleteColumn) tableChange));
+ } else if (tableChange instanceof RenameColumn) {
+ RenameColumn renameColumn = ((RenameColumn) tableChange);
+ return renameColumn(getfieldName(renameColumn),
renameColumn.getNewName());
+ } else if (tableChange instanceof UpdateColumnComment) {
+ UpdateColumnComment updateColumnComment = (UpdateColumnComment)
tableChange;
+ return updateColumnComment(
+ getfieldName(updateColumnComment),
updateColumnComment.getNewComment());
+ } else if (tableChange instanceof UpdateColumnNullability) {
+ UpdateColumnNullability updateColumnNullability =
(UpdateColumnNullability) tableChange;
+ return updateColumnNullability(
+ getfieldName(updateColumnNullability),
updateColumnNullability.nullable());
+ } else if (tableChange instanceof UpdateColumnPosition) {
+ UpdateColumnPosition updateColumnPosition = (UpdateColumnPosition)
tableChange;
+ Preconditions.checkArgument(
+ !(updateColumnPosition.getPosition() instanceof Default),
+ "Default position is not supported for Paimon update column
position.");
+ return updateColumnPosition(
+ move(getfieldName(updateColumnPosition),
updateColumnPosition.getPosition()));
+ } else if (tableChange instanceof UpdateColumnType) {
+ UpdateColumnType updateColumnType = (UpdateColumnType) tableChange;
+ return updateColumnType(
+ getfieldName(updateColumnType),
toPaimonType(updateColumnType.getNewDataType()));
+ }
+ } else if (tableChange instanceof UpdateComment) {
+ return updateComment(((UpdateComment) tableChange).getNewComment());
+ } else if (tableChange instanceof SetProperty) {
+ SetProperty setProperty = ((SetProperty) tableChange);
+ return setOption(setProperty.getProperty(), setProperty.getValue());
+ } else if (tableChange instanceof RemoveProperty) {
+ RemoveProperty removeProperty = (RemoveProperty) tableChange;
+ return removeOption(removeProperty.getProperty());
+ }
+ throw new UnsupportedOperationException(
+ String.format(
+ "Paimon does not support %s table change.",
tableChange.getClass().getSimpleName()));
+ }
+
private static void checkColumnDefaultValue(String fieldName, Expression
defaultValue) {
Preconditions.checkArgument(
defaultValue.equals(Column.DEFAULT_VALUE_NOT_SET),
@@ -46,4 +141,39 @@ public class TableOpsUtils {
String.format(
"Paimon does not support auto increment column. Illegal column:
%s.", fieldName));
}
+
+ private static void checkNestedColumn(String[] fieldNames) {
+ Preconditions.checkArgument(
+ fieldNames.length == 1,
+ String.format(
+ "Paimon does not support update nested column. Illegal column:
%s.",
+ getfieldName(fieldNames)));
+ }
+
+ public static String[] getfieldName(String fieldName) {
+ return new String[] {fieldName};
+ }
+
+ public static String getfieldName(String[] fieldName) {
+ return DOT.join(fieldName);
+ }
+
+ private static String getfieldName(ColumnChange columnChange) {
+ return getfieldName(columnChange.fieldName());
+ }
+
+ private static Move move(String fieldName, ColumnPosition columnPosition)
+ throws UnsupportedOperationException {
+ if (columnPosition instanceof After) {
+ return Move.after(fieldName, ((After) columnPosition).getColumn());
+ } else if (columnPosition instanceof Default) {
+ return null;
+ } else if (columnPosition instanceof First) {
+ return Move.first(fieldName);
+ }
+ throw new UnsupportedOperationException(
+ String.format(
+ "Paimon does not support %s column position.",
+ columnPosition.getClass().getSimpleName()));
+ }
}
diff --git
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/TestGravitinoPaimonTable.java
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/TestGravitinoPaimonTable.java
index a34c27957..3a522822a 100644
---
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/TestGravitinoPaimonTable.java
+++
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/TestGravitinoPaimonTable.java
@@ -384,7 +384,13 @@ public class TestGravitinoPaimonTable {
initBackendCatalogProperties(), entity.toCatalogInfo(),
PAIMON_PROPERTIES_METADATA);
Map<String, String> map = Maps.newHashMap();
map.put(PaimonTablePropertiesMetadata.COMMENT, "test");
- map.put(PaimonTablePropertiesMetadata.CREATOR, "test");
+ map.put(PaimonTablePropertiesMetadata.OWNER, "test");
+ map.put(PaimonTablePropertiesMetadata.BUCKET_KEY, "test");
+ map.put(PaimonTablePropertiesMetadata.MERGE_ENGINE, "test");
+ map.put(PaimonTablePropertiesMetadata.SEQUENCE_FIELD, "test");
+ map.put(PaimonTablePropertiesMetadata.ROWKIND_FIELD, "test");
+ map.put(PaimonTablePropertiesMetadata.PRIMARY_KEY, "test");
+ map.put(PaimonTablePropertiesMetadata.PARTITION, "test");
for (Map.Entry<String, String> entry : map.entrySet()) {
HashMap<String, String> properties =
new HashMap<String, String>() {
diff --git
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java
index cd0ca2c5b..a8f321f42 100644
---
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java
+++
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java
@@ -23,6 +23,7 @@ import static
org.apache.gravitino.rel.expressions.transforms.Transforms.identit
import static org.apache.gravitino.rel.indexes.Indexes.primary;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
@@ -55,9 +56,11 @@ import
org.apache.gravitino.integration.test.util.GravitinoITUtils;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.rel.TableCatalog;
+import org.apache.gravitino.rel.TableChange;
import org.apache.gravitino.rel.expressions.NamedReference;
import org.apache.gravitino.rel.expressions.distributions.Distribution;
import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.literals.Literals;
import org.apache.gravitino.rel.expressions.sorts.SortOrder;
import org.apache.gravitino.rel.expressions.sorts.SortOrders;
import org.apache.gravitino.rel.expressions.transforms.Transform;
@@ -96,6 +99,7 @@ public abstract class CatalogPaimonBaseIT extends AbstractIT {
private static final String PAIMON_COL_NAME3 = "paimon_col_name3";
private static final String PAIMON_COL_NAME4 = "paimon_col_name4";
private static final String PAIMON_COL_NAME5 = "paimon_col_name5";
+ private static final String alertTableName = "alert_table_name";
private String metalakeName =
GravitinoITUtils.genRandomName("paimon_it_metalake");
private String catalogName =
GravitinoITUtils.genRandomName("paimon_it_catalog");
private String schemaName =
GravitinoITUtils.genRandomName("paimon_it_schema");
@@ -605,6 +609,153 @@ public abstract class CatalogPaimonBaseIT extends
AbstractIT {
Assertions.assertEquals(0, paimonCatalog.listTables(schemaName).size());
}
+ @Test
+ public void testAlterPaimonTable() {
+ Column[] columns = createColumns();
+ catalog
+ .asTableCatalog()
+ .createTable(
+ NameIdentifier.of(schemaName, tableName), columns, table_comment,
createProperties());
+
+ // The RenameTable operation cannot be performed together with other
operations.
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ catalog
+ .asTableCatalog()
+ .alterTable(
+ NameIdentifier.of(schemaName, tableName),
+ TableChange.rename(alertTableName),
+ TableChange.updateComment(table_comment + "_new")));
+
+ // rename table.
+ catalog
+ .asTableCatalog()
+ .alterTable(NameIdentifier.of(schemaName, tableName),
TableChange.rename(alertTableName));
+
+ // other operations.
+ catalog
+ .asTableCatalog()
+ .alterTable(
+ NameIdentifier.of(schemaName, alertTableName),
+ TableChange.updateComment(table_comment + "_new"),
+ TableChange.setProperty("key2", "val2_new"),
+ TableChange.removeProperty("key1"),
+ TableChange.addColumn(
+ new String[] {"paimon_col_name5_for_add"},
Types.StringType.get()),
+ TableChange.renameColumn(new String[] {PAIMON_COL_NAME2},
"paimon_col_name2_new"),
+ TableChange.updateColumnComment(new String[] {PAIMON_COL_NAME1},
"comment_new"),
+ TableChange.updateColumnType(new String[] {PAIMON_COL_NAME1},
Types.StringType.get()),
+ TableChange.updateColumnNullability(new String[]
{PAIMON_COL_NAME1}, false));
+
+ Table table =
catalog.asTableCatalog().loadTable(NameIdentifier.of(schemaName,
alertTableName));
+ Assertions.assertEquals(alertTableName, table.name());
+ Assertions.assertEquals("val2_new", table.properties().get("key2"));
+
+ Assertions.assertEquals(PAIMON_COL_NAME1, table.columns()[0].name());
+ Assertions.assertEquals(Types.StringType.get(),
table.columns()[0].dataType());
+ Assertions.assertEquals("comment_new", table.columns()[0].comment());
+ Assertions.assertFalse(table.columns()[0].nullable());
+
+ Assertions.assertEquals("paimon_col_name2_new", table.columns()[1].name());
+ Assertions.assertEquals(Types.DateType.get(),
table.columns()[1].dataType());
+ Assertions.assertEquals("col_2_comment", table.columns()[1].comment());
+
+ Assertions.assertEquals(PAIMON_COL_NAME3, table.columns()[2].name());
+ Assertions.assertEquals(Types.StringType.get(),
table.columns()[2].dataType());
+ Assertions.assertEquals("col_3_comment", table.columns()[2].comment());
+
+ Assertions.assertEquals(PAIMON_COL_NAME4, table.columns()[3].name());
+ Assertions.assertEquals(columns[3].dataType(),
table.columns()[3].dataType());
+ Assertions.assertEquals("col_4_comment", table.columns()[3].comment());
+
+ Assertions.assertEquals("paimon_col_name5_for_add",
table.columns()[4].name());
+ Assertions.assertEquals(Types.StringType.get(),
table.columns()[4].dataType());
+ Assertions.assertNull(table.columns()[4].comment());
+
+ // test add column with exceptions
+ // 1. with column default value
+ TableChange withDefaultValue =
+ TableChange.addColumn(
+ new String[] {"newColumn"}, Types.ByteType.get(), "comment",
Literals.NULL);
+ RuntimeException exception =
+ Assertions.assertThrows(
+ RuntimeException.class,
+ () ->
+ catalog
+ .asTableCatalog()
+ .alterTable(NameIdentifier.of(schemaName, alertTableName),
withDefaultValue));
+ Assertions.assertTrue(
+ exception
+ .getMessage()
+ .contains(
+ "Paimon set column default value through table properties
instead of column info"));
+
+ // 2. with column autoIncrement
+ TableChange withAutoIncrement =
+ TableChange.addColumn(
+ new String[] {"newColumn"}, Types.ByteType.get(), "comment", null,
true, true);
+ exception =
+ Assertions.assertThrows(
+ RuntimeException.class,
+ () ->
+ catalog
+ .asTableCatalog()
+ .alterTable(NameIdentifier.of(schemaName, alertTableName),
withAutoIncrement));
+ Assertions.assertTrue(
+ exception.getMessage().contains("Paimon does not support auto
increment column"));
+
+ // update column position
+ Column col1 = Column.of("name", Types.StringType.get(), "comment");
+ Column col2 = Column.of("address", Types.StringType.get(), "comment");
+ Column col3 = Column.of("date_of_birth", Types.DateType.get(), "comment");
+
+ Column[] newColumns = new Column[] {col1, col2, col3};
+ NameIdentifier tableIdentifier =
+ NameIdentifier.of(schemaName,
GravitinoITUtils.genRandomName("PaimonAlterTableIT"));
+ catalog
+ .asTableCatalog()
+ .createTable(
+ tableIdentifier,
+ newColumns,
+ table_comment,
+ ImmutableMap.of(),
+ Transforms.EMPTY_TRANSFORM,
+ Distributions.NONE,
+ new SortOrder[0],
+ new Index[0]);
+
+ catalog
+ .asTableCatalog()
+ .alterTable(
+ tableIdentifier,
+ TableChange.updateColumnPosition(
+ new String[] {col1.name()},
TableChange.ColumnPosition.after(col2.name())),
+ TableChange.updateColumnPosition(
+ new String[] {col3.name()},
TableChange.ColumnPosition.first()));
+
+ Table updateColumnPositionTable =
catalog.asTableCatalog().loadTable(tableIdentifier);
+
+ Column[] updateCols = updateColumnPositionTable.columns();
+ Assertions.assertEquals(3, updateCols.length);
+ Assertions.assertEquals(col3.name(), updateCols[0].name());
+ Assertions.assertEquals(col2.name(), updateCols[1].name());
+ Assertions.assertEquals(col1.name(), updateCols[2].name());
+
+ // delete column
+ Assertions.assertDoesNotThrow(
+ () ->
+ catalog
+ .asTableCatalog()
+ .alterTable(
+ tableIdentifier,
+ TableChange.deleteColumn(new String[] {col3.name()}, true),
+ TableChange.deleteColumn(new String[] {col2.name()},
true)));
+ Table delColTable = catalog.asTableCatalog().loadTable(tableIdentifier);
+ Assertions.assertEquals(1, delColTable.columns().length);
+ Assertions.assertEquals(col1.name(), delColTable.columns()[0].name());
+ }
+
@Test
void testOperationDataOfPaimonTable() {
Column[] columns = createColumns();
diff --git
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/ops/TestPaimonCatalogOps.java
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/ops/TestPaimonCatalogOps.java
index d0534c2b3..dbae9c713 100644
---
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/ops/TestPaimonCatalogOps.java
+++
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/ops/TestPaimonCatalogOps.java
@@ -18,24 +18,57 @@
*/
package org.apache.gravitino.catalog.lakehouse.paimon.ops;
+import static
org.apache.gravitino.catalog.lakehouse.paimon.utils.TableOpsUtils.getfieldName;
+import static
org.apache.gravitino.catalog.lakehouse.paimon.utils.TypeUtils.toPaimonType;
+import static org.apache.gravitino.rel.TableChange.ColumnPosition.after;
+import static org.apache.gravitino.rel.TableChange.ColumnPosition.defaultPos;
+import static org.apache.gravitino.rel.TableChange.ColumnPosition.first;
+import static org.apache.gravitino.rel.TableChange.addColumn;
+import static org.apache.gravitino.rel.TableChange.deleteColumn;
+import static org.apache.gravitino.rel.TableChange.removeProperty;
+import static org.apache.gravitino.rel.TableChange.renameColumn;
+import static org.apache.gravitino.rel.TableChange.setProperty;
+import static org.apache.gravitino.rel.TableChange.updateColumnComment;
+import static org.apache.gravitino.rel.TableChange.updateColumnNullability;
+import static org.apache.gravitino.rel.TableChange.updateColumnPosition;
+import static org.apache.gravitino.rel.TableChange.updateColumnType;
+import static org.apache.gravitino.rel.TableChange.updateComment;
import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.io.File;
+import java.util.List;
import java.util.Map;
+import java.util.function.Consumer;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import
org.apache.gravitino.catalog.lakehouse.paimon.PaimonCatalogPropertiesMetadata;
import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConfig;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.TableChange.ColumnPosition;
+import org.apache.gravitino.rel.TableChange.UpdateColumnComment;
+import org.apache.gravitino.rel.TableChange.UpdateComment;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.rel.types.Types;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Catalog.ColumnAlreadyExistException;
+import org.apache.paimon.catalog.Catalog.ColumnNotExistException;
+import org.apache.paimon.catalog.Catalog.DatabaseNotExistException;
+import org.apache.paimon.catalog.Catalog.TableAlreadyExistException;
import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaChange.AddColumn;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.DateType;
import org.apache.paimon.types.IntType;
@@ -68,6 +101,7 @@ public class TestPaimonCatalogOps {
new PaimonConfig(
ImmutableMap.of(PaimonCatalogPropertiesMetadata.WAREHOUSE,
warehouse.getPath())));
createDatabase();
+ createTable();
}
@AfterEach
@@ -79,41 +113,10 @@ public class TestPaimonCatalogOps {
}
@Test
- void testTableOperations() throws Exception {
+ void testLoadListAndDropTableOperations() throws Exception {
// list tables
Assertions.assertEquals(
- 0,
paimonCatalogOps.listTables(IDENTIFIER.namespace().toString()).size());
-
- // create table
- Pair<String, Schema> tableInfo =
- Pair.of(
- IDENTIFIER.toString(),
- Schema.newBuilder()
- .column("col_1", DataTypes.INT().notNull(),
IntType.class.getSimpleName())
- .column("col_2", DataTypes.STRING(),
VarCharType.class.getSimpleName())
- .column("col_3", DataTypes.STRING().notNull(),
VarCharType.class.getSimpleName())
- .column(
- "col_4",
- DataTypes.ARRAY(
- RowType.builder()
- .field(
- "sub_col_1",
- DataTypes.DATE(),
- RowType.class.getSimpleName() +
DateType.class.getSimpleName())
- .field(
- "sub_col_2",
- DataTypes.MAP(DataTypes.STRING(),
DataTypes.INT()),
- RowType.class.getSimpleName() +
MapType.class.getSimpleName())
- .field(
- "sub_col_3",
- DataTypes.TIMESTAMP().notNull(),
- RowType.class.getSimpleName() +
TimestampType.class.getSimpleName())
- .build()),
- ArrayType.class.getSimpleName())
- .comment(COMMENT)
- .options(OPTIONS)
- .build());
- paimonCatalogOps.createTable(tableInfo.getKey(), tableInfo.getValue());
+ 1,
paimonCatalogOps.listTables(IDENTIFIER.namespace().toString()).size());
// load table
Table table = paimonCatalogOps.loadTable(IDENTIFIER.toString());
@@ -149,8 +152,6 @@ public class TestPaimonCatalogOps {
assertEquals(COMMENT, table.comment().get());
assertEquals(OPTIONS.get(BUCKET.key()), table.options().get(BUCKET.key()));
- // TODO: alter table is unsupported now.
-
// drop table
Assertions.assertDoesNotThrow(() ->
paimonCatalogOps.dropTable(IDENTIFIER.toString()));
Assertions.assertThrowsExactly(
@@ -162,10 +163,225 @@ public class TestPaimonCatalogOps {
0,
paimonCatalogOps.listTables(IDENTIFIER.namespace().toString()).size());
// create a new table to make database not empty to test drop database
cascade
- paimonCatalogOps.createTable(tableInfo.getKey(), tableInfo.getValue());
+ createTable();
Assertions.assertNotNull(paimonCatalogOps.loadTable(IDENTIFIER.toString()));
}
+ @Test
+ void testAddColumn() throws Exception {
+ // Test AddColumn after column.
+ assertAddColumn(5, Types.BooleanType.get(), after("col_2"), 2);
+ // Test AddColumn first column.
+ assertAddColumn(6, Types.FloatType.get(), first(), 0);
+ // Test AddColumn last column.
+ assertAddColumn(7, Types.DoubleType.get(), defaultPos(), 6);
+ assertAddColumn(8, Types.DoubleType.get(), null, 7);
+ // Test NullPointerException with AddColumn for after non-existent column.
+ assertThrowsExactly(
+ NullPointerException.class,
+ () -> assertAddColumn(9, Types.LongType.get(), after("col_10"), null));
+ }
+
+ @Test
+ void testUpdateColumnComment() throws Exception {
+ assertAlterTable(
+ table -> {
+ DataField dataField = table.rowType().getFields().get(0);
+ assertEquals("col_1", dataField.name());
+ assertEquals(UpdateColumnComment.class.getSimpleName(),
dataField.description());
+ },
+ updateColumnComment(getfieldName("col_1"),
UpdateColumnComment.class.getSimpleName()));
+ assertColumnNotExist(
+ updateColumnComment(getfieldName("col_10"),
UpdateComment.class.getSimpleName()));
+ }
+
+ @Test
+ void testUpdateColumnNullability() throws Exception {
+ assertAlterTable(
+ table -> {
+ DataField dataField = table.rowType().getFields().get(1);
+ assertEquals("col_2", dataField.name());
+ assertFalse(dataField.type().isNullable());
+ },
+ updateColumnNullability(getfieldName("col_2"), false));
+ assertColumnNotExist(updateColumnNullability(getfieldName("col_5"), true));
+ }
+
+ @Test
+ void testUpdateColumnPosition() throws Exception {
+ // Test UpdateColumnPosition after column.
+ assertUpdateColumnPosition(3, after("col_1"), 0, 2, 1, 3);
+ // Test UpdateColumnPosition first column.
+ assertUpdateColumnPosition(4, first(), 1, 3, 2, 0);
+ // Test NullPointerException with UpdateColumnPosition for non-existent
column.
+ assertThrowsExactly(
+ IllegalArgumentException.class, () -> assertUpdateColumnPosition(5,
defaultPos()));
+ // Test NullPointerException with UpdateColumnPosition for after
non-existent column.
+ assertThrowsExactly(
+ NullPointerException.class, () -> assertUpdateColumnPosition(1,
after("col_5")));
+ }
+
+ @Test
+ void testUpdateColumnType() throws Exception {
+ assertAlterTable(
+ table -> {
+ DataField dataField = table.rowType().getFields().get(0);
+ assertEquals("col_1", dataField.name());
+ assertEquals(DataTypes.BIGINT(), dataField.type());
+ },
+ updateColumnType(getfieldName("col_1"), Types.LongType.get()));
+ assertColumnNotExist(updateColumnType(getfieldName("col_5"),
Types.ShortType.get()));
+ assertThrowsExactly(
+ IllegalStateException.class,
+ () ->
+ assertAlterTable(
+ table -> {}, updateColumnType(getfieldName("col_1"),
Types.DateType.get())));
+ assertThrowsExactly(
+ IllegalStateException.class,
+ () ->
+ assertAlterTable(
+ table -> {}, updateColumnType(getfieldName("col_4"),
Types.LongType.get())));
+ }
+
+ @Test
+ void testRenameColumn() throws Exception {
+ assertAlterTable(
+ table -> {
+ List<String> fieldNames = table.rowType().getFieldNames();
+ assertFalse(fieldNames.contains("col2"));
+ assertEquals("col_5", fieldNames.get(1));
+ assertEquals(4, fieldNames.size());
+ },
+ renameColumn(getfieldName("col_2"), "col_5"));
+ assertColumnNotExist(renameColumn(getfieldName("col_6"), "col_7"));
+ assertThrowsExactly(
+ ColumnAlreadyExistException.class,
+ () -> assertAlterTable(table -> {},
renameColumn(getfieldName("col_1"), "col_4")));
+ }
+
+ @Test
+ void testDeleteColumn() throws Exception {
+ assertAlterTable(
+ table -> {
+ List<String> fieldNames = table.rowType().getFieldNames();
+ assertFalse(fieldNames.contains("col_2"));
+ assertEquals("col_3", fieldNames.get(1));
+ assertEquals(3, fieldNames.size());
+ },
+ deleteColumn(getfieldName("col_2"), true));
+ assertColumnNotExist(deleteColumn(getfieldName("col_5"), true));
+ assertColumnNotExist(deleteColumn(getfieldName("col_5"), false));
+ }
+
+ @Test
+ void testUpdateComment() throws Exception {
+ assertAlterTable(
+ table -> {
+ assertTrue(table.comment().isPresent());
+ assertEquals(UpdateComment.class.getSimpleName(),
table.comment().get());
+ },
+ updateComment(UpdateComment.class.getSimpleName()));
+ }
+
+ @Test
+ void testSetAndRemoveProperty() throws Exception {
+ String propertyKey = "test_property_key_1";
+ assertFalse(
+
paimonCatalogOps.loadTable(IDENTIFIER.toString()).options().containsKey(propertyKey));
+ // Test SetProperty with non-existent property.
+ String propertyValue = "test_property_value_1";
+ assertAlterTable(
+ table -> {
+ Map<String, String> options = table.options();
+ assertTrue(options.containsKey(propertyKey));
+ assertEquals(propertyValue, options.get(propertyKey));
+ },
+ setProperty(propertyKey, propertyValue));
+ // Test SetProperty with overwrite existing property.
+ String newPropertyValue = "test_property_value_2";
+ assertAlterTable(
+ table -> {
+ Map<String, String> options = table.options();
+ assertTrue(options.containsKey(propertyKey));
+ assertEquals(newPropertyValue, options.get(propertyKey));
+ },
+ setProperty(propertyKey, newPropertyValue));
+ // Test RemoveProperty with existing property.
+ assertAlterTable(
+ table -> assertFalse(table.options().containsKey(propertyKey)),
+ removeProperty(propertyKey));
+ // Test RemoveProperty with non-existent property.
+ assertDoesNotThrow(() -> assertAlterTable(table -> {},
removeProperty(propertyKey)));
+ }
+
+ @Test
+ void testMultipleAlterTable() throws Exception {
+ assertAlterTable(
+ table -> {
+ List<String> fieldNames = table.rowType().getFieldNames();
+ assertEquals("col_5", fieldNames.get(0));
+ assertFalse(fieldNames.contains("col_2"));
+ assertEquals(3, fieldNames.size());
+ Map<String, String> options = table.options();
+ assertTrue(options.containsKey("test_property_key"));
+ assertEquals("test_property_value",
options.get("test_property_key"));
+ },
+ renameColumn(getfieldName("col_1"), "col_5"),
+ deleteColumn(getfieldName("col_2"), true),
+ setProperty("test_property_key", "test_property_value"));
+ }
+
+ private void assertUpdateColumnPosition(int column, ColumnPosition
columnPosition, int... fields)
+ throws Exception {
+ String columnName = "col_" + column;
+ assertAlterTable(
+ table -> {
+ List<String> fieldNames = table.rowType().getFieldNames();
+ assertEquals("col_1", fieldNames.get(fields[0]));
+ assertEquals("col_2", fieldNames.get(fields[1]));
+ assertEquals("col_3", fieldNames.get(fields[2]));
+ assertEquals("col_4", fieldNames.get(fields[3]));
+ },
+ updateColumnPosition(getfieldName(columnName), columnPosition));
+ }
+
+ private void assertAddColumn(int column, Type type, ColumnPosition
columnPosition, Integer field)
+ throws Exception {
+ String columnName = "col_" + column;
+ assertAlterTable(
+ table -> {
+ DataField dataField = table.rowType().getFields().get(field);
+ assertEquals(columnName, dataField.name());
+ assertEquals(toPaimonType(type), dataField.type());
+ assertEquals(AddColumn.class.getSimpleName(),
dataField.description());
+ assertTrue(dataField.type().isNullable());
+ },
+ addColumn(
+ getfieldName(columnName),
+ type,
+ SchemaChange.AddColumn.class.getSimpleName(),
+ columnPosition,
+ true,
+ false));
+ }
+
+ private void assertColumnNotExist(TableChange tableChange) {
+ assertThrowsExactly(
+ ColumnNotExistException.class, () -> assertAlterTable(table -> {},
tableChange));
+ }
+
+ private void assertAlterTable(Consumer<Table> consumer, TableChange...
tableChanges)
+ throws Exception {
+ consumer.accept(alterTable(tableChanges));
+ }
+
+ private Table alterTable(TableChange... tableChanges)
+ throws Catalog.ColumnAlreadyExistException,
Catalog.TableNotExistException,
+ Catalog.ColumnNotExistException {
+ paimonCatalogOps.alterTable(IDENTIFIER.toString(), tableChanges);
+ return paimonCatalogOps.loadTable(IDENTIFIER.toString());
+ }
+
private void createDatabase() throws Exception {
// list databases
assertEquals(0, paimonCatalogOps.listDatabases().size());
@@ -177,6 +393,38 @@ public class TestPaimonCatalogOps {
assertNotNull(paimonCatalogOps.loadDatabase(DATABASE));
}
+ private void createTable() throws TableAlreadyExistException,
DatabaseNotExistException {
+ Pair<String, Schema> tableInfo =
+ Pair.of(
+ IDENTIFIER.toString(),
+ Schema.newBuilder()
+ .column("col_1", DataTypes.INT().notNull(),
IntType.class.getSimpleName())
+ .column("col_2", DataTypes.STRING(),
VarCharType.class.getSimpleName())
+ .column("col_3", DataTypes.STRING().notNull(),
VarCharType.class.getSimpleName())
+ .column(
+ "col_4",
+ DataTypes.ARRAY(
+ RowType.builder()
+ .field(
+ "sub_col_1",
+ DataTypes.DATE(),
+ RowType.class.getSimpleName() +
DateType.class.getSimpleName())
+ .field(
+ "sub_col_2",
+ DataTypes.MAP(DataTypes.STRING(),
DataTypes.INT()),
+ RowType.class.getSimpleName() +
MapType.class.getSimpleName())
+ .field(
+ "sub_col_3",
+ DataTypes.TIMESTAMP().notNull(),
+ RowType.class.getSimpleName() +
TimestampType.class.getSimpleName())
+ .build()),
+ ArrayType.class.getSimpleName())
+ .comment(COMMENT)
+ .options(OPTIONS)
+ .build());
+ paimonCatalogOps.createTable(tableInfo.getKey(), tableInfo.getValue());
+ }
+
private void dropDatabase() throws Exception {
Assertions.assertEquals(1, paimonCatalogOps.listDatabases().size());
Assertions.assertEquals(1, paimonCatalogOps.listTables(DATABASE).size());
diff --git
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/utils/TestTableOpsUtils.java
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/utils/TestTableOpsUtils.java
new file mode 100644
index 000000000..d82a2bd31
--- /dev/null
+++
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/utils/TestTableOpsUtils.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.lakehouse.paimon.utils;
+
+import static
org.apache.gravitino.catalog.lakehouse.paimon.utils.TableOpsUtils.buildSchemaChange;
+import static
org.apache.gravitino.catalog.lakehouse.paimon.utils.TableOpsUtils.getfieldName;
+import static org.apache.gravitino.rel.TableChange.ColumnPosition.after;
+import static org.apache.gravitino.rel.TableChange.ColumnPosition.defaultPos;
+import static org.apache.gravitino.rel.TableChange.addColumn;
+import static org.apache.gravitino.rel.TableChange.addIndex;
+import static org.apache.gravitino.rel.TableChange.deleteColumn;
+import static org.apache.gravitino.rel.TableChange.deleteIndex;
+import static org.apache.gravitino.rel.TableChange.removeProperty;
+import static org.apache.gravitino.rel.TableChange.rename;
+import static org.apache.gravitino.rel.TableChange.renameColumn;
+import static org.apache.gravitino.rel.TableChange.setProperty;
+import static org.apache.gravitino.rel.TableChange.updateColumnAutoIncrement;
+import static org.apache.gravitino.rel.TableChange.updateColumnComment;
+import static org.apache.gravitino.rel.TableChange.updateColumnDefaultValue;
+import static org.apache.gravitino.rel.TableChange.updateColumnNullability;
+import static org.apache.gravitino.rel.TableChange.updateColumnPosition;
+import static org.apache.gravitino.rel.TableChange.updateColumnType;
+import static org.apache.gravitino.rel.TableChange.updateComment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Arrays;
+import java.util.function.Consumer;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.indexes.Index.IndexType;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.rel.types.Types.DoubleType;
+import org.apache.gravitino.rel.types.Types.FloatType;
+import org.apache.gravitino.rel.types.Types.IntegerType;
+import org.apache.gravitino.rel.types.Types.ListType;
+import org.apache.gravitino.rel.types.Types.MapType;
+import org.apache.gravitino.rel.types.Types.StringType;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaChange.AddColumn;
+import org.apache.paimon.schema.SchemaChange.DropColumn;
+import org.apache.paimon.schema.SchemaChange.Move.MoveType;
+import org.apache.paimon.schema.SchemaChange.RemoveOption;
+import org.apache.paimon.schema.SchemaChange.RenameColumn;
+import org.apache.paimon.schema.SchemaChange.SetOption;
+import org.apache.paimon.schema.SchemaChange.UpdateColumnComment;
+import org.apache.paimon.schema.SchemaChange.UpdateColumnNullability;
+import org.apache.paimon.schema.SchemaChange.UpdateColumnPosition;
+import org.apache.paimon.schema.SchemaChange.UpdateColumnType;
+import org.apache.paimon.schema.SchemaChange.UpdateComment;
+import org.apache.paimon.types.DataTypeRoot;
+import org.junit.jupiter.api.Test;
+
+public class TestTableOpsUtils {
+
+ @Test
+ void testAddColumnFirst() {
+ assertTableChange(
+ addColumn(
+ getfieldName("col_1"),
+ IntegerType.get(),
+ AddColumn.class.getSimpleName(),
+ TableChange.ColumnPosition.first(),
+ false,
+ false),
+ AddColumn.class,
+ schemaChange -> {
+ AddColumn addColumn = (AddColumn) schemaChange;
+ assertEquals("col_1", addColumn.fieldName());
+ assertEquals(DataTypeRoot.INTEGER,
addColumn.dataType().getTypeRoot());
+ assertEquals(AddColumn.class.getSimpleName(),
addColumn.description());
+ assertNotNull(addColumn.move());
+ assertEquals(MoveType.FIRST, addColumn.move().type());
+ assertEquals("col_1", addColumn.move().fieldName());
+ assertNull(addColumn.move().referenceFieldName());
+ assertFalse(addColumn.dataType().isNullable());
+ });
+ }
+
+ @Test
+ void testAddColumnAfter() {
+ assertTableChange(
+ addColumn(
+ getfieldName("col_2"),
+ FloatType.get(),
+ AddColumn.class.getSimpleName(),
+ after("col_1"),
+ true,
+ false),
+ AddColumn.class,
+ schemaChange -> {
+ AddColumn addColumn = (AddColumn) schemaChange;
+ assertEquals("col_2", addColumn.fieldName());
+ assertEquals(DataTypeRoot.FLOAT, addColumn.dataType().getTypeRoot());
+ assertEquals(AddColumn.class.getSimpleName(),
addColumn.description());
+ assertNotNull(addColumn.move());
+ assertEquals(MoveType.AFTER, addColumn.move().type());
+ assertEquals("col_2", addColumn.move().fieldName());
+ assertEquals("col_1", addColumn.move().referenceFieldName());
+ assertTrue(addColumn.dataType().isNullable());
+ });
+ }
+
+ @Test
+ void testAddColumnDefaultPosition() {
+ assertTableChange(
+ addColumn(
+ getfieldName("col_3"),
+ ListType.of(StringType.get(), false),
+ AddColumn.class.getSimpleName(),
+ defaultPos(),
+ false,
+ false),
+ AddColumn.class,
+ schemaChange -> {
+ AddColumn addColumn = (AddColumn) schemaChange;
+ assertEquals("col_3", addColumn.fieldName());
+ assertEquals(DataTypeRoot.ARRAY, addColumn.dataType().getTypeRoot());
+ assertEquals(AddColumn.class.getSimpleName(),
addColumn.description());
+ assertNull(addColumn.move());
+ assertFalse(addColumn.dataType().isNullable());
+ });
+ }
+
+ @Test
+ void testAddColumnWitNullPosition() {
+ assertTableChange(
+ addColumn(
+ getfieldName("col_4"),
+ MapType.of(StringType.get(), IntegerType.get(), true),
+ AddColumn.class.getSimpleName(),
+ null,
+ false,
+ false),
+ AddColumn.class,
+ schemaChange -> {
+ AddColumn addColumn = (AddColumn) schemaChange;
+ assertEquals("col_4", addColumn.fieldName());
+ assertEquals(DataTypeRoot.MAP, addColumn.dataType().getTypeRoot());
+ assertEquals(AddColumn.class.getSimpleName(),
addColumn.description());
+ assertNull(addColumn.move());
+ assertFalse(addColumn.dataType().isNullable());
+ });
+ }
+
+ @Test
+ void testupdateColumnComment() {
+ assertTableChange(
+ updateColumnComment(getfieldName("col_1"),
UpdateColumnComment.class.getSimpleName()),
+ UpdateColumnComment.class,
+ schemaChange -> {
+ UpdateColumnComment updateColumnComment = (UpdateColumnComment)
schemaChange;
+ assertEquals("col_1",
getfieldName(updateColumnComment.fieldNames()));
+ assertEquals(
+ UpdateColumnComment.class.getSimpleName(),
updateColumnComment.newDescription());
+ });
+ }
+
+ @Test
+ void testUpdateColumnNullability() {
+ assertTableChange(
+ updateColumnNullability(getfieldName("col_2"), false),
+ UpdateColumnNullability.class,
+ schemaChange -> {
+ UpdateColumnNullability updateColumnNullability =
(UpdateColumnNullability) schemaChange;
+ assertEquals("col_2",
getfieldName(updateColumnNullability.fieldNames()));
+ assertFalse(updateColumnNullability.newNullability());
+ });
+ }
+
+ @Test
+ void testUpdateColumnType() {
+ assertTableChange(
+ updateColumnType(getfieldName("col_4"), DoubleType.get()),
+ UpdateColumnType.class,
+ schemaChange -> {
+ UpdateColumnType updateColumnType = (UpdateColumnType) schemaChange;
+ assertEquals("col_4", updateColumnType.fieldName());
+ assertEquals(DataTypeRoot.DOUBLE,
updateColumnType.newDataType().getTypeRoot());
+ });
+ }
+
+ @Test
+ void testRenameColumn() {
+ assertTableChange(
+ renameColumn(getfieldName("col_1"), "col_5"),
+ RenameColumn.class,
+ schemaChange -> {
+ RenameColumn renameColumn = (RenameColumn) schemaChange;
+ assertEquals("col_1", renameColumn.fieldName());
+ assertEquals("col_5", renameColumn.newName());
+ });
+ }
+
+ @Test
+ void testDeleteColumn() {
+ assertTableChange(
+ deleteColumn(getfieldName("col_2"), true),
+ DropColumn.class,
+ schemaChange -> {
+ DropColumn dropColumn = (DropColumn) schemaChange;
+ assertEquals("col_2", dropColumn.fieldName());
+ });
+ }
+
+ @Test
+ void testUpdateComment() {
+ assertTableChange(
+ updateComment(UpdateComment.class.getSimpleName()),
+ UpdateComment.class,
+ schemaChange -> {
+ UpdateComment updateComment = (UpdateComment) schemaChange;
+ assertEquals(UpdateComment.class.getSimpleName(),
updateComment.comment());
+ });
+ }
+
+ @Test
+ void testSetProperty() {
+ assertTableChange(
+ setProperty("prop_k1", "prop_v1"),
+ SetOption.class,
+ schemaChange -> {
+ SetOption setOption = (SetOption) schemaChange;
+ assertEquals("prop_k1", setOption.key());
+ assertEquals("prop_v1", setOption.value());
+ });
+ }
+
+ @Test
+ void testRemoveProperty() {
+ assertTableChange(
+ removeProperty("prop_k1"),
+ RemoveOption.class,
+ schemaChange -> {
+ RemoveOption removeOption = (RemoveOption) schemaChange;
+ assertEquals("prop_k1", removeOption.key());
+ });
+ }
+
+ @Test
+ void testUpdateColumnPosition() {
+ assertTableChange(
+ updateColumnPosition(getfieldName("col_3"), after("col_1")),
+ UpdateColumnPosition.class,
+ schemaChange -> {
+ UpdateColumnPosition updateColumnPosition = (UpdateColumnPosition)
schemaChange;
+ assertEquals("col_3", updateColumnPosition.move().fieldName());
+ assertEquals("col_1",
updateColumnPosition.move().referenceFieldName());
+ });
+ }
+
+ @Test
+ void testUnsupportedTableChanges() {
+ // Test UnsupportedOperationException with AddIndex, DeleteIndex,
RenameTable,
+ // UpdateColumnAutoIncrement, UpdateColumnDefaultValue.
+ Arrays.asList(
+ addIndex(IndexType.UNIQUE_KEY, "uk", new String[][] {{"col_5"}}),
+ deleteIndex("uk", true),
+ rename("tb_1"),
+ updateColumnAutoIncrement(getfieldName("col_5"), true),
+ updateColumnDefaultValue(
+ getfieldName("col_5"), Literals.of("default",
Types.VarCharType.of(255))))
+ .forEach(this::assertUnsupportedTableChange);
+
+ // Test IllegalArgumentException with AddColumn default value and auto
increment.
+ Arrays.asList(
+ Pair.of(
+ addColumn(
+ getfieldName("col_1"),
+ IntegerType.get(),
+ AddColumn.class.getSimpleName(),
+ TableChange.ColumnPosition.first(),
+ false,
+ false,
+ Literals.of("default", Types.StringType.get())),
+ "Paimon set column default value through table properties
instead of column info. Illegal column: col_1."),
+ Pair.of(
+ addColumn(
+ getfieldName("col_1"),
+ IntegerType.get(),
+ AddColumn.class.getSimpleName(),
+ TableChange.ColumnPosition.first(),
+ false,
+ true),
+ "Paimon does not support auto increment column. Illegal
column: col_1."))
+ .forEach(this::assertIllegalTableChange);
+ }
+
+ private void assertTableChange(
+ TableChange tableChange, Class<?> expected, Consumer<SchemaChange>
consumer) {
+ SchemaChange schemaChange = buildSchemaChange(tableChange);
+ assertEquals(expected, schemaChange.getClass());
+ consumer.accept(schemaChange);
+ }
+
+ private void assertUnsupportedTableChange(TableChange tableChange) {
+ UnsupportedOperationException exception =
+ assertThrowsExactly(
+ UnsupportedOperationException.class, () ->
buildSchemaChange(tableChange));
+ assertEquals(
+ String.format(
+ "Paimon does not support %s table change.",
tableChange.getClass().getSimpleName()),
+ exception.getMessage());
+ }
+
+ private void assertIllegalTableChange(Pair<TableChange, String> tableChange)
{
+ IllegalArgumentException exception =
+ assertThrowsExactly(
+ IllegalArgumentException.class, () ->
buildSchemaChange(tableChange.getKey()));
+ assertEquals(tableChange.getValue(), exception.getMessage());
+ }
+}