This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new 9b243bf2c [#3916] feat(catalog-lakehouse-paimon): Support alter table 
for Paimon Catalog (#4428)
9b243bf2c is described below

commit 9b243bf2cf4d25e4ec7984bd2cb150dab9874d79
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Aug 8 11:40:39 2024 +0800

    [#3916] feat(catalog-lakehouse-paimon): Support alter table for Paimon 
Catalog (#4428)
    
    ### What changes were proposed in this pull request?
    
    Support alter table for Paimon Catalog.
    
    ### Why are the changes needed?
    
    Fix: https://github.com/apache/gravitino/issues/3916
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    New UTs and ITs.
    
    Co-authored-by: cai can <[email protected]>
    Co-authored-by: caican <[email protected]>
---
 .../lakehouse/paimon/PaimonCatalogOperations.java  |  78 ++++-
 .../paimon/PaimonTablePropertiesMetadata.java      |  16 +-
 .../lakehouse/paimon/ops/PaimonCatalogOps.java     |  15 +
 .../lakehouse/paimon/utils/TableOpsUtils.java      | 130 ++++++++
 .../lakehouse/paimon/TestGravitinoPaimonTable.java |   8 +-
 .../integration/test/CatalogPaimonBaseIT.java      | 151 ++++++++++
 .../lakehouse/paimon/ops/TestPaimonCatalogOps.java | 320 +++++++++++++++++---
 .../lakehouse/paimon/utils/TestTableOpsUtils.java  | 333 +++++++++++++++++++++
 8 files changed, 1011 insertions(+), 40 deletions(-)

diff --git 
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java
 
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java
index 68a5ebcff..99487fe87 100644
--- 
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java
+++ 
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java
@@ -31,7 +31,10 @@ import java.time.Instant;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
 import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.SchemaChange;
@@ -43,6 +46,7 @@ import org.apache.gravitino.connector.HasPropertyMetadata;
 import org.apache.gravitino.connector.SupportsSchemas;
 import org.apache.gravitino.exceptions.ConnectionFailedException;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
+import org.apache.gravitino.exceptions.NoSuchColumnException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NoSuchTableException;
 import org.apache.gravitino.exceptions.NonEmptySchemaException;
@@ -52,6 +56,7 @@ import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.rel.Column;
 import org.apache.gravitino.rel.TableCatalog;
 import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.TableChange.RenameTable;
 import org.apache.gravitino.rel.expressions.NamedReference;
 import org.apache.gravitino.rel.expressions.distributions.Distribution;
 import org.apache.gravitino.rel.expressions.distributions.Distributions;
@@ -84,6 +89,8 @@ public class PaimonCatalogOperations implements 
CatalogOperations, SupportsSchem
       "Paimon schema (database) %s already exists.";
   private static final String NO_SUCH_TABLE_EXCEPTION = "Paimon table %s does 
not exist.";
   private static final String TABLE_ALREADY_EXISTS_EXCEPTION = "Paimon table 
%s already exists.";
+  private static final String NO_SUCH_COLUMN_EXCEPTION =
+      "Paimon column of table %s does not exist.";
 
   /**
    * Initializes the Paimon catalog operations with the provided configuration.
@@ -402,7 +409,25 @@ public class PaimonCatalogOperations implements 
CatalogOperations, SupportsSchem
   @Override
   public GravitinoPaimonTable alterTable(NameIdentifier identifier, 
TableChange... changes)
       throws NoSuchTableException, IllegalArgumentException {
-    throw new UnsupportedOperationException("alterTable is unsupported now for 
Paimon Catalog.");
+    Optional<TableChange> renameTableOpt =
+        Arrays.stream(changes)
+            .filter(tableChange -> tableChange instanceof RenameTable)
+            .reduce((a, b) -> b);
+    if (renameTableOpt.isPresent()) {
+      String otherChanges =
+          Arrays.stream(changes)
+              .filter(tableChange -> !(tableChange instanceof RenameTable))
+              .map(String::valueOf)
+              .collect(Collectors.joining("\n"));
+      Preconditions.checkArgument(
+          StringUtils.isEmpty(otherChanges),
+          String.format(
+              "The operation to change the table name cannot be performed 
together with other operations. "
+                  + "The list of operations that you cannot perform includes: 
\n%s",
+              otherChanges));
+      return renameTable(identifier, (RenameTable) renameTableOpt.get());
+    }
+    return internalAlterTable(identifier, changes);
   }
 
   /**
@@ -470,4 +495,55 @@ public class PaimonCatalogOperations implements 
CatalogOperations, SupportsSchem
                     index.type() == Index.IndexType.PRIMARY_KEY,
                     "Paimon only supports primary key Index."));
   }
+
+  /**
+   * Performs rename table change with the provided identifier.
+   *
+   * @param identifier The identifier of the table to rename.
+   * @param renameTable Table Change to modify the table name.
+   * @return The renamed {@link GravitinoPaimonTable} instance.
+   * @throws NoSuchTableException If the table with the provided identifier 
does not exist.
+   * @throws IllegalArgumentException This exception will not be thrown in 
this method.
+   */
+  private GravitinoPaimonTable renameTable(
+      NameIdentifier identifier, TableChange.RenameTable renameTable)
+      throws NoSuchTableException, IllegalArgumentException {
+    NameIdentifier newNnameIdentifier =
+        NameIdentifier.of(identifier.namespace(), renameTable.getNewName());
+    NameIdentifier oldIdentifier = buildPaimonNameIdentifier(identifier);
+    NameIdentifier newIdentifier = 
buildPaimonNameIdentifier(newNnameIdentifier);
+    try {
+      paimonCatalogOps.renameTable(oldIdentifier.toString(), 
newIdentifier.toString());
+    } catch (Catalog.TableNotExistException e) {
+      throw new NoSuchTableException(e, NO_SUCH_TABLE_EXCEPTION, 
oldIdentifier);
+    } catch (Catalog.TableAlreadyExistException e) {
+      throw new TableAlreadyExistsException(e, TABLE_ALREADY_EXISTS_EXCEPTION, 
newIdentifier);
+    }
+    return loadTable(newNnameIdentifier);
+  }
+
+  /**
+   * Performs alter table changes with the provided identifier according to 
the specified {@link
+   * TableChange} changes.
+   *
+   * @param identifier The identifier of the table to alter.
+   * @param changes The changes to apply to the table.
+   * @return The altered {@link GravitinoPaimonTable} instance.
+   * @throws NoSuchTableException If the table with the provided identifier 
does not exist.
+   * @throws IllegalArgumentException This exception will not be thrown in 
this method.
+   */
+  private GravitinoPaimonTable internalAlterTable(NameIdentifier identifier, 
TableChange... changes)
+      throws NoSuchTableException, IllegalArgumentException {
+    NameIdentifier paimonNameIdentifier = 
buildPaimonNameIdentifier(identifier);
+    try {
+      paimonCatalogOps.alterTable(paimonNameIdentifier.toString(), changes);
+    } catch (Catalog.TableNotExistException e) {
+      throw new NoSuchTableException(e, NO_SUCH_TABLE_EXCEPTION, 
paimonNameIdentifier);
+    } catch (Catalog.ColumnNotExistException e) {
+      throw new NoSuchColumnException(e, NO_SUCH_COLUMN_EXCEPTION, 
paimonNameIdentifier);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    return loadTable(identifier);
+  }
 }
diff --git 
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonTablePropertiesMetadata.java
 
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonTablePropertiesMetadata.java
index 80145f29d..1c57e5b2c 100644
--- 
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonTablePropertiesMetadata.java
+++ 
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonTablePropertiesMetadata.java
@@ -35,7 +35,13 @@ import org.apache.gravitino.connector.PropertyEntry;
 public class PaimonTablePropertiesMetadata extends BasePropertiesMetadata {
 
   public static final String COMMENT = "comment";
-  public static final String CREATOR = "creator";
+  public static final String OWNER = "owner";
+  public static final String BUCKET_KEY = "bucket-key";
+  public static final String MERGE_ENGINE = "merge-engine";
+  public static final String SEQUENCE_FIELD = "sequence.field";
+  public static final String ROWKIND_FIELD = "rowkind.field";
+  public static final String PRIMARY_KEY = "primary-key";
+  public static final String PARTITION = "partition";
 
   private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;
 
@@ -43,7 +49,13 @@ public class PaimonTablePropertiesMetadata extends 
BasePropertiesMetadata {
     List<PropertyEntry<?>> propertyEntries =
         ImmutableList.of(
             stringReservedPropertyEntry(COMMENT, "The table comment", true),
-            stringReservedPropertyEntry(CREATOR, "The table creator", false));
+            stringReservedPropertyEntry(OWNER, "The table owner", false),
+            stringReservedPropertyEntry(BUCKET_KEY, "The table bucket key", 
false),
+            stringReservedPropertyEntry(MERGE_ENGINE, "The table merge 
engine", false),
+            stringReservedPropertyEntry(SEQUENCE_FIELD, "The table sequence 
field", false),
+            stringReservedPropertyEntry(ROWKIND_FIELD, "The table rowkind 
field", false),
+            stringReservedPropertyEntry(PRIMARY_KEY, "The table primary key", 
false),
+            stringReservedPropertyEntry(PARTITION, "The table partition", 
false));
     PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries, 
PropertyEntry::getName);
   }
 
diff --git 
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/ops/PaimonCatalogOps.java
 
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/ops/PaimonCatalogOps.java
index ed5c2de71..d009c8bb6 100644
--- 
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/ops/PaimonCatalogOps.java
+++ 
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/ops/PaimonCatalogOps.java
@@ -19,15 +19,20 @@
 package org.apache.gravitino.catalog.lakehouse.paimon.ops;
 
 import static 
org.apache.gravitino.catalog.lakehouse.paimon.utils.CatalogUtils.loadCatalogBackend;
+import static 
org.apache.gravitino.catalog.lakehouse.paimon.utils.TableOpsUtils.buildSchemaChanges;
 
 import com.google.common.base.Preconditions;
 import java.util.List;
 import java.util.Map;
 import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConfig;
+import org.apache.gravitino.rel.TableChange;
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Catalog.ColumnAlreadyExistException;
+import org.apache.paimon.catalog.Catalog.ColumnNotExistException;
 import org.apache.paimon.catalog.Catalog.DatabaseAlreadyExistException;
 import org.apache.paimon.catalog.Catalog.DatabaseNotEmptyException;
 import org.apache.paimon.catalog.Catalog.DatabaseNotExistException;
+import org.apache.paimon.catalog.Catalog.TableAlreadyExistException;
 import org.apache.paimon.catalog.Catalog.TableNotExistException;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.schema.Schema;
@@ -89,6 +94,16 @@ public class PaimonCatalogOps implements AutoCloseable {
     catalog.dropTable(tableIdentifier(tableName), false);
   }
 
+  public void alterTable(String tableName, TableChange... changes)
+      throws ColumnAlreadyExistException, TableNotExistException, 
ColumnNotExistException {
+    catalog.alterTable(tableIdentifier(tableName), 
buildSchemaChanges(changes), false);
+  }
+
+  public void renameTable(String fromTableName, String toTableName)
+      throws TableNotExistException, TableAlreadyExistException {
+    catalog.renameTable(tableIdentifier(fromTableName), 
tableIdentifier(toTableName), false);
+  }
+
   private Identifier tableIdentifier(String tableName) {
     return Identifier.fromString(tableName);
   }
diff --git 
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/utils/TableOpsUtils.java
 
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/utils/TableOpsUtils.java
index 86587d10e..2d8543673 100644
--- 
a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/utils/TableOpsUtils.java
+++ 
b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/utils/TableOpsUtils.java
@@ -18,20 +18,115 @@
  */
 package org.apache.gravitino.catalog.lakehouse.paimon.utils;
 
+import static 
org.apache.gravitino.catalog.lakehouse.paimon.utils.TypeUtils.toPaimonType;
+import static org.apache.paimon.schema.SchemaChange.addColumn;
+import static org.apache.paimon.schema.SchemaChange.dropColumn;
+import static org.apache.paimon.schema.SchemaChange.removeOption;
+import static org.apache.paimon.schema.SchemaChange.renameColumn;
+import static org.apache.paimon.schema.SchemaChange.setOption;
+import static org.apache.paimon.schema.SchemaChange.updateColumnComment;
+import static org.apache.paimon.schema.SchemaChange.updateColumnNullability;
+import static org.apache.paimon.schema.SchemaChange.updateColumnPosition;
+import static org.apache.paimon.schema.SchemaChange.updateColumnType;
+import static org.apache.paimon.schema.SchemaChange.updateComment;
+
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.gravitino.catalog.lakehouse.paimon.ops.PaimonCatalogOps;
 import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.TableChange.AddColumn;
+import org.apache.gravitino.rel.TableChange.After;
+import org.apache.gravitino.rel.TableChange.ColumnChange;
+import org.apache.gravitino.rel.TableChange.ColumnPosition;
+import org.apache.gravitino.rel.TableChange.Default;
+import org.apache.gravitino.rel.TableChange.DeleteColumn;
+import org.apache.gravitino.rel.TableChange.First;
+import org.apache.gravitino.rel.TableChange.RemoveProperty;
+import org.apache.gravitino.rel.TableChange.RenameColumn;
+import org.apache.gravitino.rel.TableChange.SetProperty;
+import org.apache.gravitino.rel.TableChange.UpdateColumnComment;
+import org.apache.gravitino.rel.TableChange.UpdateColumnNullability;
+import org.apache.gravitino.rel.TableChange.UpdateColumnPosition;
+import org.apache.gravitino.rel.TableChange.UpdateColumnType;
+import org.apache.gravitino.rel.TableChange.UpdateComment;
 import org.apache.gravitino.rel.expressions.Expression;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaChange.Move;
 
 /** Utilities of {@link PaimonCatalogOps} to support table operation. */
 public class TableOpsUtils {
 
+  public static final Joiner DOT = Joiner.on(".");
+
   public static void checkColumnCapability(
       String fieldName, Expression defaultValue, boolean autoIncrement) {
     checkColumnDefaultValue(fieldName, defaultValue);
     checkColumnAutoIncrement(fieldName, autoIncrement);
   }
 
+  public static List<SchemaChange> buildSchemaChanges(TableChange... 
tableChanges)
+      throws UnsupportedOperationException {
+    List<SchemaChange> schemaChanges = new ArrayList<>();
+    for (TableChange tableChange : tableChanges) {
+      schemaChanges.add(buildSchemaChange(tableChange));
+    }
+    return schemaChanges;
+  }
+
+  public static SchemaChange buildSchemaChange(TableChange tableChange)
+      throws UnsupportedOperationException {
+    if (tableChange instanceof ColumnChange) {
+      if (tableChange instanceof AddColumn) {
+        AddColumn addColumn = (AddColumn) tableChange;
+        String fieldName = getfieldName(addColumn);
+        checkColumnCapability(fieldName, addColumn.getDefaultValue(), 
addColumn.isAutoIncrement());
+        return addColumn(
+            fieldName,
+            toPaimonType(addColumn.getDataType()).copy(addColumn.isNullable()),
+            addColumn.getComment(),
+            move(fieldName, addColumn.getPosition()));
+      } else if (tableChange instanceof DeleteColumn) {
+        return dropColumn(getfieldName((DeleteColumn) tableChange));
+      } else if (tableChange instanceof RenameColumn) {
+        RenameColumn renameColumn = ((RenameColumn) tableChange);
+        return renameColumn(getfieldName(renameColumn), 
renameColumn.getNewName());
+      } else if (tableChange instanceof UpdateColumnComment) {
+        UpdateColumnComment updateColumnComment = (UpdateColumnComment) 
tableChange;
+        return updateColumnComment(
+            getfieldName(updateColumnComment), 
updateColumnComment.getNewComment());
+      } else if (tableChange instanceof UpdateColumnNullability) {
+        UpdateColumnNullability updateColumnNullability = 
(UpdateColumnNullability) tableChange;
+        return updateColumnNullability(
+            getfieldName(updateColumnNullability), 
updateColumnNullability.nullable());
+      } else if (tableChange instanceof UpdateColumnPosition) {
+        UpdateColumnPosition updateColumnPosition = (UpdateColumnPosition) 
tableChange;
+        Preconditions.checkArgument(
+            !(updateColumnPosition.getPosition() instanceof Default),
+            "Default position is not supported for Paimon update column 
position.");
+        return updateColumnPosition(
+            move(getfieldName(updateColumnPosition), 
updateColumnPosition.getPosition()));
+      } else if (tableChange instanceof UpdateColumnType) {
+        UpdateColumnType updateColumnType = (UpdateColumnType) tableChange;
+        return updateColumnType(
+            getfieldName(updateColumnType), 
toPaimonType(updateColumnType.getNewDataType()));
+      }
+    } else if (tableChange instanceof UpdateComment) {
+      return updateComment(((UpdateComment) tableChange).getNewComment());
+    } else if (tableChange instanceof SetProperty) {
+      SetProperty setProperty = ((SetProperty) tableChange);
+      return setOption(setProperty.getProperty(), setProperty.getValue());
+    } else if (tableChange instanceof RemoveProperty) {
+      RemoveProperty removeProperty = (RemoveProperty) tableChange;
+      return removeOption(removeProperty.getProperty());
+    }
+    throw new UnsupportedOperationException(
+        String.format(
+            "Paimon does not support %s table change.", 
tableChange.getClass().getSimpleName()));
+  }
+
   private static void checkColumnDefaultValue(String fieldName, Expression 
defaultValue) {
     Preconditions.checkArgument(
         defaultValue.equals(Column.DEFAULT_VALUE_NOT_SET),
@@ -46,4 +141,39 @@ public class TableOpsUtils {
         String.format(
             "Paimon does not support auto increment column. Illegal column: 
%s.", fieldName));
   }
+
+  private static void checkNestedColumn(String[] fieldNames) {
+    Preconditions.checkArgument(
+        fieldNames.length == 1,
+        String.format(
+            "Paimon does not support update nested column. Illegal column: 
%s.",
+            getfieldName(fieldNames)));
+  }
+
+  public static String[] getfieldName(String fieldName) {
+    return new String[] {fieldName};
+  }
+
+  public static String getfieldName(String[] fieldName) {
+    return DOT.join(fieldName);
+  }
+
+  private static String getfieldName(ColumnChange columnChange) {
+    return getfieldName(columnChange.fieldName());
+  }
+
+  private static Move move(String fieldName, ColumnPosition columnPosition)
+      throws UnsupportedOperationException {
+    if (columnPosition instanceof After) {
+      return Move.after(fieldName, ((After) columnPosition).getColumn());
+    } else if (columnPosition instanceof Default) {
+      return null;
+    } else if (columnPosition instanceof First) {
+      return Move.first(fieldName);
+    }
+    throw new UnsupportedOperationException(
+        String.format(
+            "Paimon does not support %s column position.",
+            columnPosition.getClass().getSimpleName()));
+  }
 }
diff --git 
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/TestGravitinoPaimonTable.java
 
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/TestGravitinoPaimonTable.java
index a34c27957..3a522822a 100644
--- 
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/TestGravitinoPaimonTable.java
+++ 
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/TestGravitinoPaimonTable.java
@@ -384,7 +384,13 @@ public class TestGravitinoPaimonTable {
           initBackendCatalogProperties(), entity.toCatalogInfo(), 
PAIMON_PROPERTIES_METADATA);
       Map<String, String> map = Maps.newHashMap();
       map.put(PaimonTablePropertiesMetadata.COMMENT, "test");
-      map.put(PaimonTablePropertiesMetadata.CREATOR, "test");
+      map.put(PaimonTablePropertiesMetadata.OWNER, "test");
+      map.put(PaimonTablePropertiesMetadata.BUCKET_KEY, "test");
+      map.put(PaimonTablePropertiesMetadata.MERGE_ENGINE, "test");
+      map.put(PaimonTablePropertiesMetadata.SEQUENCE_FIELD, "test");
+      map.put(PaimonTablePropertiesMetadata.ROWKIND_FIELD, "test");
+      map.put(PaimonTablePropertiesMetadata.PRIMARY_KEY, "test");
+      map.put(PaimonTablePropertiesMetadata.PARTITION, "test");
       for (Map.Entry<String, String> entry : map.entrySet()) {
         HashMap<String, String> properties =
             new HashMap<String, String>() {
diff --git 
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java
 
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java
index cd0ca2c5b..a8f321f42 100644
--- 
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java
+++ 
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java
@@ -23,6 +23,7 @@ import static 
org.apache.gravitino.rel.expressions.transforms.Transforms.identit
 import static org.apache.gravitino.rel.indexes.Indexes.primary;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import java.time.LocalDate;
 import java.time.format.DateTimeFormatter;
@@ -55,9 +56,11 @@ import 
org.apache.gravitino.integration.test.util.GravitinoITUtils;
 import org.apache.gravitino.rel.Column;
 import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.rel.TableCatalog;
+import org.apache.gravitino.rel.TableChange;
 import org.apache.gravitino.rel.expressions.NamedReference;
 import org.apache.gravitino.rel.expressions.distributions.Distribution;
 import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.literals.Literals;
 import org.apache.gravitino.rel.expressions.sorts.SortOrder;
 import org.apache.gravitino.rel.expressions.sorts.SortOrders;
 import org.apache.gravitino.rel.expressions.transforms.Transform;
@@ -96,6 +99,7 @@ public abstract class CatalogPaimonBaseIT extends AbstractIT {
   private static final String PAIMON_COL_NAME3 = "paimon_col_name3";
   private static final String PAIMON_COL_NAME4 = "paimon_col_name4";
   private static final String PAIMON_COL_NAME5 = "paimon_col_name5";
+  private static final String alertTableName = "alert_table_name";
   private String metalakeName = 
GravitinoITUtils.genRandomName("paimon_it_metalake");
   private String catalogName = 
GravitinoITUtils.genRandomName("paimon_it_catalog");
   private String schemaName = 
GravitinoITUtils.genRandomName("paimon_it_schema");
@@ -605,6 +609,153 @@ public abstract class CatalogPaimonBaseIT extends 
AbstractIT {
     Assertions.assertEquals(0, paimonCatalog.listTables(schemaName).size());
   }
 
+  @Test
+  public void testAlterPaimonTable() {
+    Column[] columns = createColumns();
+    catalog
+        .asTableCatalog()
+        .createTable(
+            NameIdentifier.of(schemaName, tableName), columns, table_comment, 
createProperties());
+
+    // The RenameTable operation cannot be performed together with other 
operations.
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            catalog
+                .asTableCatalog()
+                .alterTable(
+                    NameIdentifier.of(schemaName, tableName),
+                    TableChange.rename(alertTableName),
+                    TableChange.updateComment(table_comment + "_new")));
+
+    // rename table.
+    catalog
+        .asTableCatalog()
+        .alterTable(NameIdentifier.of(schemaName, tableName), 
TableChange.rename(alertTableName));
+
+    // other operations.
+    catalog
+        .asTableCatalog()
+        .alterTable(
+            NameIdentifier.of(schemaName, alertTableName),
+            TableChange.updateComment(table_comment + "_new"),
+            TableChange.setProperty("key2", "val2_new"),
+            TableChange.removeProperty("key1"),
+            TableChange.addColumn(
+                new String[] {"paimon_col_name5_for_add"}, 
Types.StringType.get()),
+            TableChange.renameColumn(new String[] {PAIMON_COL_NAME2}, 
"paimon_col_name2_new"),
+            TableChange.updateColumnComment(new String[] {PAIMON_COL_NAME1}, 
"comment_new"),
+            TableChange.updateColumnType(new String[] {PAIMON_COL_NAME1}, 
Types.StringType.get()),
+            TableChange.updateColumnNullability(new String[] 
{PAIMON_COL_NAME1}, false));
+
+    Table table = 
catalog.asTableCatalog().loadTable(NameIdentifier.of(schemaName, 
alertTableName));
+    Assertions.assertEquals(alertTableName, table.name());
+    Assertions.assertEquals("val2_new", table.properties().get("key2"));
+
+    Assertions.assertEquals(PAIMON_COL_NAME1, table.columns()[0].name());
+    Assertions.assertEquals(Types.StringType.get(), 
table.columns()[0].dataType());
+    Assertions.assertEquals("comment_new", table.columns()[0].comment());
+    Assertions.assertFalse(table.columns()[0].nullable());
+
+    Assertions.assertEquals("paimon_col_name2_new", table.columns()[1].name());
+    Assertions.assertEquals(Types.DateType.get(), 
table.columns()[1].dataType());
+    Assertions.assertEquals("col_2_comment", table.columns()[1].comment());
+
+    Assertions.assertEquals(PAIMON_COL_NAME3, table.columns()[2].name());
+    Assertions.assertEquals(Types.StringType.get(), 
table.columns()[2].dataType());
+    Assertions.assertEquals("col_3_comment", table.columns()[2].comment());
+
+    Assertions.assertEquals(PAIMON_COL_NAME4, table.columns()[3].name());
+    Assertions.assertEquals(columns[3].dataType(), 
table.columns()[3].dataType());
+    Assertions.assertEquals("col_4_comment", table.columns()[3].comment());
+
+    Assertions.assertEquals("paimon_col_name5_for_add", 
table.columns()[4].name());
+    Assertions.assertEquals(Types.StringType.get(), 
table.columns()[4].dataType());
+    Assertions.assertNull(table.columns()[4].comment());
+
+    // test add column with exceptions
+    // 1. with column default value
+    TableChange withDefaultValue =
+        TableChange.addColumn(
+            new String[] {"newColumn"}, Types.ByteType.get(), "comment", 
Literals.NULL);
+    RuntimeException exception =
+        Assertions.assertThrows(
+            RuntimeException.class,
+            () ->
+                catalog
+                    .asTableCatalog()
+                    .alterTable(NameIdentifier.of(schemaName, alertTableName), 
withDefaultValue));
+    Assertions.assertTrue(
+        exception
+            .getMessage()
+            .contains(
+                "Paimon set column default value through table properties 
instead of column info"));
+
+    // 2. with column autoIncrement
+    TableChange withAutoIncrement =
+        TableChange.addColumn(
+            new String[] {"newColumn"}, Types.ByteType.get(), "comment", null, 
true, true);
+    exception =
+        Assertions.assertThrows(
+            RuntimeException.class,
+            () ->
+                catalog
+                    .asTableCatalog()
+                    .alterTable(NameIdentifier.of(schemaName, alertTableName), 
withAutoIncrement));
+    Assertions.assertTrue(
+        exception.getMessage().contains("Paimon does not support auto 
increment column"));
+
+    // update column position
+    Column col1 = Column.of("name", Types.StringType.get(), "comment");
+    Column col2 = Column.of("address", Types.StringType.get(), "comment");
+    Column col3 = Column.of("date_of_birth", Types.DateType.get(), "comment");
+
+    Column[] newColumns = new Column[] {col1, col2, col3};
+    NameIdentifier tableIdentifier =
+        NameIdentifier.of(schemaName, 
GravitinoITUtils.genRandomName("PaimonAlterTableIT"));
+    catalog
+        .asTableCatalog()
+        .createTable(
+            tableIdentifier,
+            newColumns,
+            table_comment,
+            ImmutableMap.of(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            new SortOrder[0],
+            new Index[0]);
+
+    catalog
+        .asTableCatalog()
+        .alterTable(
+            tableIdentifier,
+            TableChange.updateColumnPosition(
+                new String[] {col1.name()}, 
TableChange.ColumnPosition.after(col2.name())),
+            TableChange.updateColumnPosition(
+                new String[] {col3.name()}, 
TableChange.ColumnPosition.first()));
+
+    Table updateColumnPositionTable = 
catalog.asTableCatalog().loadTable(tableIdentifier);
+
+    Column[] updateCols = updateColumnPositionTable.columns();
+    Assertions.assertEquals(3, updateCols.length);
+    Assertions.assertEquals(col3.name(), updateCols[0].name());
+    Assertions.assertEquals(col2.name(), updateCols[1].name());
+    Assertions.assertEquals(col1.name(), updateCols[2].name());
+
+    // delete column
+    Assertions.assertDoesNotThrow(
+        () ->
+            catalog
+                .asTableCatalog()
+                .alterTable(
+                    tableIdentifier,
+                    TableChange.deleteColumn(new String[] {col3.name()}, true),
+                    TableChange.deleteColumn(new String[] {col2.name()}, 
true)));
+    Table delColTable = catalog.asTableCatalog().loadTable(tableIdentifier);
+    Assertions.assertEquals(1, delColTable.columns().length);
+    Assertions.assertEquals(col1.name(), delColTable.columns()[0].name());
+  }
+
   @Test
   void testOperationDataOfPaimonTable() {
     Column[] columns = createColumns();
diff --git 
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/ops/TestPaimonCatalogOps.java
 
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/ops/TestPaimonCatalogOps.java
index d0534c2b3..dbae9c713 100644
--- 
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/ops/TestPaimonCatalogOps.java
+++ 
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/ops/TestPaimonCatalogOps.java
@@ -18,24 +18,57 @@
  */
 package org.apache.gravitino.catalog.lakehouse.paimon.ops;
 
+import static 
org.apache.gravitino.catalog.lakehouse.paimon.utils.TableOpsUtils.getfieldName;
+import static 
org.apache.gravitino.catalog.lakehouse.paimon.utils.TypeUtils.toPaimonType;
+import static org.apache.gravitino.rel.TableChange.ColumnPosition.after;
+import static org.apache.gravitino.rel.TableChange.ColumnPosition.defaultPos;
+import static org.apache.gravitino.rel.TableChange.ColumnPosition.first;
+import static org.apache.gravitino.rel.TableChange.addColumn;
+import static org.apache.gravitino.rel.TableChange.deleteColumn;
+import static org.apache.gravitino.rel.TableChange.removeProperty;
+import static org.apache.gravitino.rel.TableChange.renameColumn;
+import static org.apache.gravitino.rel.TableChange.setProperty;
+import static org.apache.gravitino.rel.TableChange.updateColumnComment;
+import static org.apache.gravitino.rel.TableChange.updateColumnNullability;
+import static org.apache.gravitino.rel.TableChange.updateColumnPosition;
+import static org.apache.gravitino.rel.TableChange.updateColumnType;
+import static org.apache.gravitino.rel.TableChange.updateComment;
 import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import java.io.File;
+import java.util.List;
 import java.util.Map;
+import java.util.function.Consumer;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import 
org.apache.gravitino.catalog.lakehouse.paimon.PaimonCatalogPropertiesMetadata;
 import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConfig;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.TableChange.ColumnPosition;
+import org.apache.gravitino.rel.TableChange.UpdateColumnComment;
+import org.apache.gravitino.rel.TableChange.UpdateComment;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.rel.types.Types;
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Catalog.ColumnAlreadyExistException;
+import org.apache.paimon.catalog.Catalog.ColumnNotExistException;
+import org.apache.paimon.catalog.Catalog.DatabaseNotExistException;
+import org.apache.paimon.catalog.Catalog.TableAlreadyExistException;
 import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaChange.AddColumn;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.DateType;
 import org.apache.paimon.types.IntType;
@@ -68,6 +101,7 @@ public class TestPaimonCatalogOps {
             new PaimonConfig(
                 ImmutableMap.of(PaimonCatalogPropertiesMetadata.WAREHOUSE, 
warehouse.getPath())));
     createDatabase();
+    createTable();
   }
 
   @AfterEach
@@ -79,41 +113,10 @@ public class TestPaimonCatalogOps {
   }
 
   @Test
-  void testTableOperations() throws Exception {
+  void testLoadListAndDropTableOperations() throws Exception {
     // list tables
     Assertions.assertEquals(
-        0, 
paimonCatalogOps.listTables(IDENTIFIER.namespace().toString()).size());
-
-    // create table
-    Pair<String, Schema> tableInfo =
-        Pair.of(
-            IDENTIFIER.toString(),
-            Schema.newBuilder()
-                .column("col_1", DataTypes.INT().notNull(), 
IntType.class.getSimpleName())
-                .column("col_2", DataTypes.STRING(), 
VarCharType.class.getSimpleName())
-                .column("col_3", DataTypes.STRING().notNull(), 
VarCharType.class.getSimpleName())
-                .column(
-                    "col_4",
-                    DataTypes.ARRAY(
-                        RowType.builder()
-                            .field(
-                                "sub_col_1",
-                                DataTypes.DATE(),
-                                RowType.class.getSimpleName() + 
DateType.class.getSimpleName())
-                            .field(
-                                "sub_col_2",
-                                DataTypes.MAP(DataTypes.STRING(), 
DataTypes.INT()),
-                                RowType.class.getSimpleName() + 
MapType.class.getSimpleName())
-                            .field(
-                                "sub_col_3",
-                                DataTypes.TIMESTAMP().notNull(),
-                                RowType.class.getSimpleName() + 
TimestampType.class.getSimpleName())
-                            .build()),
-                    ArrayType.class.getSimpleName())
-                .comment(COMMENT)
-                .options(OPTIONS)
-                .build());
-    paimonCatalogOps.createTable(tableInfo.getKey(), tableInfo.getValue());
+        1, 
paimonCatalogOps.listTables(IDENTIFIER.namespace().toString()).size());
 
     // load table
     Table table = paimonCatalogOps.loadTable(IDENTIFIER.toString());
@@ -149,8 +152,6 @@ public class TestPaimonCatalogOps {
     assertEquals(COMMENT, table.comment().get());
     assertEquals(OPTIONS.get(BUCKET.key()), table.options().get(BUCKET.key()));
 
-    // TODO: alter table is unsupported now.
-
     // drop table
     Assertions.assertDoesNotThrow(() -> 
paimonCatalogOps.dropTable(IDENTIFIER.toString()));
     Assertions.assertThrowsExactly(
@@ -162,10 +163,225 @@ public class TestPaimonCatalogOps {
         0, 
paimonCatalogOps.listTables(IDENTIFIER.namespace().toString()).size());
 
     // create a new table to make database not empty to test drop database 
cascade
-    paimonCatalogOps.createTable(tableInfo.getKey(), tableInfo.getValue());
+    createTable();
     
Assertions.assertNotNull(paimonCatalogOps.loadTable(IDENTIFIER.toString()));
   }
 
+  @Test
+  void testAddColumn() throws Exception {
+    // Test AddColumn after column.
+    assertAddColumn(5, Types.BooleanType.get(), after("col_2"), 2);
+    // Test AddColumn first column.
+    assertAddColumn(6, Types.FloatType.get(), first(), 0);
+    // Test AddColumn last column.
+    assertAddColumn(7, Types.DoubleType.get(), defaultPos(), 6);
+    assertAddColumn(8, Types.DoubleType.get(), null, 7);
+    // Test NullPointerException with AddColumn for after non-existent column.
+    assertThrowsExactly(
+        NullPointerException.class,
+        () -> assertAddColumn(9, Types.LongType.get(), after("col_10"), null));
+  }
+
+  @Test
+  void testUpdateColumnComment() throws Exception {
+    assertAlterTable(
+        table -> {
+          DataField dataField = table.rowType().getFields().get(0);
+          assertEquals("col_1", dataField.name());
+          assertEquals(UpdateColumnComment.class.getSimpleName(), 
dataField.description());
+        },
+        updateColumnComment(getfieldName("col_1"), 
UpdateColumnComment.class.getSimpleName()));
+    assertColumnNotExist(
+        updateColumnComment(getfieldName("col_10"), 
UpdateComment.class.getSimpleName()));
+  }
+
+  @Test
+  void testUpdateColumnNullability() throws Exception {
+    assertAlterTable(
+        table -> {
+          DataField dataField = table.rowType().getFields().get(1);
+          assertEquals("col_2", dataField.name());
+          assertFalse(dataField.type().isNullable());
+        },
+        updateColumnNullability(getfieldName("col_2"), false));
+    assertColumnNotExist(updateColumnNullability(getfieldName("col_5"), true));
+  }
+
+  @Test
+  void testUpdateColumnPosition() throws Exception {
+    // Test UpdateColumnPosition after column.
+    assertUpdateColumnPosition(3, after("col_1"), 0, 2, 1, 3);
+    // Test UpdateColumnPosition first column.
+    assertUpdateColumnPosition(4, first(), 1, 3, 2, 0);
+    // Test NullPointerException with UpdateColumnPosition for non-existent 
column.
+    assertThrowsExactly(
+        IllegalArgumentException.class, () -> assertUpdateColumnPosition(5, 
defaultPos()));
+    // Test NullPointerException with UpdateColumnPosition for after 
non-existent column.
+    assertThrowsExactly(
+        NullPointerException.class, () -> assertUpdateColumnPosition(1, 
after("col_5")));
+  }
+
+  @Test
+  void testUpdateColumnType() throws Exception {
+    assertAlterTable(
+        table -> {
+          DataField dataField = table.rowType().getFields().get(0);
+          assertEquals("col_1", dataField.name());
+          assertEquals(DataTypes.BIGINT(), dataField.type());
+        },
+        updateColumnType(getfieldName("col_1"), Types.LongType.get()));
+    assertColumnNotExist(updateColumnType(getfieldName("col_5"), 
Types.ShortType.get()));
+    assertThrowsExactly(
+        IllegalStateException.class,
+        () ->
+            assertAlterTable(
+                table -> {}, updateColumnType(getfieldName("col_1"), 
Types.DateType.get())));
+    assertThrowsExactly(
+        IllegalStateException.class,
+        () ->
+            assertAlterTable(
+                table -> {}, updateColumnType(getfieldName("col_4"), 
Types.LongType.get())));
+  }
+
+  @Test
+  void testRenameColumn() throws Exception {
+    assertAlterTable(
+        table -> {
+          List<String> fieldNames = table.rowType().getFieldNames();
+          assertFalse(fieldNames.contains("col2"));
+          assertEquals("col_5", fieldNames.get(1));
+          assertEquals(4, fieldNames.size());
+        },
+        renameColumn(getfieldName("col_2"), "col_5"));
+    assertColumnNotExist(renameColumn(getfieldName("col_6"), "col_7"));
+    assertThrowsExactly(
+        ColumnAlreadyExistException.class,
+        () -> assertAlterTable(table -> {}, 
renameColumn(getfieldName("col_1"), "col_4")));
+  }
+
+  @Test
+  void testDeleteColumn() throws Exception {
+    assertAlterTable(
+        table -> {
+          List<String> fieldNames = table.rowType().getFieldNames();
+          assertFalse(fieldNames.contains("col_2"));
+          assertEquals("col_3", fieldNames.get(1));
+          assertEquals(3, fieldNames.size());
+        },
+        deleteColumn(getfieldName("col_2"), true));
+    assertColumnNotExist(deleteColumn(getfieldName("col_5"), true));
+    assertColumnNotExist(deleteColumn(getfieldName("col_5"), false));
+  }
+
+  @Test
+  void testUpdateComment() throws Exception {
+    assertAlterTable(
+        table -> {
+          assertTrue(table.comment().isPresent());
+          assertEquals(UpdateComment.class.getSimpleName(), 
table.comment().get());
+        },
+        updateComment(UpdateComment.class.getSimpleName()));
+  }
+
+  @Test
+  void testSetAndRemoveProperty() throws Exception {
+    String propertyKey = "test_property_key_1";
+    assertFalse(
+        
paimonCatalogOps.loadTable(IDENTIFIER.toString()).options().containsKey(propertyKey));
+    // Test SetProperty with non-existent property.
+    String propertyValue = "test_property_value_1";
+    assertAlterTable(
+        table -> {
+          Map<String, String> options = table.options();
+          assertTrue(options.containsKey(propertyKey));
+          assertEquals(propertyValue, options.get(propertyKey));
+        },
+        setProperty(propertyKey, propertyValue));
+    // Test SetProperty with overwrite existing property.
+    String newPropertyValue = "test_property_value_2";
+    assertAlterTable(
+        table -> {
+          Map<String, String> options = table.options();
+          assertTrue(options.containsKey(propertyKey));
+          assertEquals(newPropertyValue, options.get(propertyKey));
+        },
+        setProperty(propertyKey, newPropertyValue));
+    // Test RemoveProperty with existing property.
+    assertAlterTable(
+        table -> assertFalse(table.options().containsKey(propertyKey)),
+        removeProperty(propertyKey));
+    // Test RemoveProperty with non-existent property.
+    assertDoesNotThrow(() -> assertAlterTable(table -> {}, 
removeProperty(propertyKey)));
+  }
+
+  @Test
+  void testMultipleAlterTable() throws Exception {
+    assertAlterTable(
+        table -> {
+          List<String> fieldNames = table.rowType().getFieldNames();
+          assertEquals("col_5", fieldNames.get(0));
+          assertFalse(fieldNames.contains("col_2"));
+          assertEquals(3, fieldNames.size());
+          Map<String, String> options = table.options();
+          assertTrue(options.containsKey("test_property_key"));
+          assertEquals("test_property_value", 
options.get("test_property_key"));
+        },
+        renameColumn(getfieldName("col_1"), "col_5"),
+        deleteColumn(getfieldName("col_2"), true),
+        setProperty("test_property_key", "test_property_value"));
+  }
+
+  private void assertUpdateColumnPosition(int column, ColumnPosition 
columnPosition, int... fields)
+      throws Exception {
+    String columnName = "col_" + column;
+    assertAlterTable(
+        table -> {
+          List<String> fieldNames = table.rowType().getFieldNames();
+          assertEquals("col_1", fieldNames.get(fields[0]));
+          assertEquals("col_2", fieldNames.get(fields[1]));
+          assertEquals("col_3", fieldNames.get(fields[2]));
+          assertEquals("col_4", fieldNames.get(fields[3]));
+        },
+        updateColumnPosition(getfieldName(columnName), columnPosition));
+  }
+
+  private void assertAddColumn(int column, Type type, ColumnPosition 
columnPosition, Integer field)
+      throws Exception {
+    String columnName = "col_" + column;
+    assertAlterTable(
+        table -> {
+          DataField dataField = table.rowType().getFields().get(field);
+          assertEquals(columnName, dataField.name());
+          assertEquals(toPaimonType(type), dataField.type());
+          assertEquals(AddColumn.class.getSimpleName(), 
dataField.description());
+          assertTrue(dataField.type().isNullable());
+        },
+        addColumn(
+            getfieldName(columnName),
+            type,
+            SchemaChange.AddColumn.class.getSimpleName(),
+            columnPosition,
+            true,
+            false));
+  }
+
+  private void assertColumnNotExist(TableChange tableChange) {
+    assertThrowsExactly(
+        ColumnNotExistException.class, () -> assertAlterTable(table -> {}, 
tableChange));
+  }
+
+  private void assertAlterTable(Consumer<Table> consumer, TableChange... 
tableChanges)
+      throws Exception {
+    consumer.accept(alterTable(tableChanges));
+  }
+
+  private Table alterTable(TableChange... tableChanges)
+      throws Catalog.ColumnAlreadyExistException, 
Catalog.TableNotExistException,
+          Catalog.ColumnNotExistException {
+    paimonCatalogOps.alterTable(IDENTIFIER.toString(), tableChanges);
+    return paimonCatalogOps.loadTable(IDENTIFIER.toString());
+  }
+
   private void createDatabase() throws Exception {
     // list databases
     assertEquals(0, paimonCatalogOps.listDatabases().size());
@@ -177,6 +393,38 @@ public class TestPaimonCatalogOps {
     assertNotNull(paimonCatalogOps.loadDatabase(DATABASE));
   }
 
+  private void createTable() throws TableAlreadyExistException, 
DatabaseNotExistException {
+    Pair<String, Schema> tableInfo =
+        Pair.of(
+            IDENTIFIER.toString(),
+            Schema.newBuilder()
+                .column("col_1", DataTypes.INT().notNull(), 
IntType.class.getSimpleName())
+                .column("col_2", DataTypes.STRING(), 
VarCharType.class.getSimpleName())
+                .column("col_3", DataTypes.STRING().notNull(), 
VarCharType.class.getSimpleName())
+                .column(
+                    "col_4",
+                    DataTypes.ARRAY(
+                        RowType.builder()
+                            .field(
+                                "sub_col_1",
+                                DataTypes.DATE(),
+                                RowType.class.getSimpleName() + 
DateType.class.getSimpleName())
+                            .field(
+                                "sub_col_2",
+                                DataTypes.MAP(DataTypes.STRING(), 
DataTypes.INT()),
+                                RowType.class.getSimpleName() + 
MapType.class.getSimpleName())
+                            .field(
+                                "sub_col_3",
+                                DataTypes.TIMESTAMP().notNull(),
+                                RowType.class.getSimpleName() + 
TimestampType.class.getSimpleName())
+                            .build()),
+                    ArrayType.class.getSimpleName())
+                .comment(COMMENT)
+                .options(OPTIONS)
+                .build());
+    paimonCatalogOps.createTable(tableInfo.getKey(), tableInfo.getValue());
+  }
+
   private void dropDatabase() throws Exception {
     Assertions.assertEquals(1, paimonCatalogOps.listDatabases().size());
     Assertions.assertEquals(1, paimonCatalogOps.listTables(DATABASE).size());
diff --git 
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/utils/TestTableOpsUtils.java
 
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/utils/TestTableOpsUtils.java
new file mode 100644
index 000000000..d82a2bd31
--- /dev/null
+++ 
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/utils/TestTableOpsUtils.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.lakehouse.paimon.utils;
+
+import static 
org.apache.gravitino.catalog.lakehouse.paimon.utils.TableOpsUtils.buildSchemaChange;
+import static 
org.apache.gravitino.catalog.lakehouse.paimon.utils.TableOpsUtils.getfieldName;
+import static org.apache.gravitino.rel.TableChange.ColumnPosition.after;
+import static org.apache.gravitino.rel.TableChange.ColumnPosition.defaultPos;
+import static org.apache.gravitino.rel.TableChange.addColumn;
+import static org.apache.gravitino.rel.TableChange.addIndex;
+import static org.apache.gravitino.rel.TableChange.deleteColumn;
+import static org.apache.gravitino.rel.TableChange.deleteIndex;
+import static org.apache.gravitino.rel.TableChange.removeProperty;
+import static org.apache.gravitino.rel.TableChange.rename;
+import static org.apache.gravitino.rel.TableChange.renameColumn;
+import static org.apache.gravitino.rel.TableChange.setProperty;
+import static org.apache.gravitino.rel.TableChange.updateColumnAutoIncrement;
+import static org.apache.gravitino.rel.TableChange.updateColumnComment;
+import static org.apache.gravitino.rel.TableChange.updateColumnDefaultValue;
+import static org.apache.gravitino.rel.TableChange.updateColumnNullability;
+import static org.apache.gravitino.rel.TableChange.updateColumnPosition;
+import static org.apache.gravitino.rel.TableChange.updateColumnType;
+import static org.apache.gravitino.rel.TableChange.updateComment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Arrays;
+import java.util.function.Consumer;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.indexes.Index.IndexType;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.rel.types.Types.DoubleType;
+import org.apache.gravitino.rel.types.Types.FloatType;
+import org.apache.gravitino.rel.types.Types.IntegerType;
+import org.apache.gravitino.rel.types.Types.ListType;
+import org.apache.gravitino.rel.types.Types.MapType;
+import org.apache.gravitino.rel.types.Types.StringType;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaChange.AddColumn;
+import org.apache.paimon.schema.SchemaChange.DropColumn;
+import org.apache.paimon.schema.SchemaChange.Move.MoveType;
+import org.apache.paimon.schema.SchemaChange.RemoveOption;
+import org.apache.paimon.schema.SchemaChange.RenameColumn;
+import org.apache.paimon.schema.SchemaChange.SetOption;
+import org.apache.paimon.schema.SchemaChange.UpdateColumnComment;
+import org.apache.paimon.schema.SchemaChange.UpdateColumnNullability;
+import org.apache.paimon.schema.SchemaChange.UpdateColumnPosition;
+import org.apache.paimon.schema.SchemaChange.UpdateColumnType;
+import org.apache.paimon.schema.SchemaChange.UpdateComment;
+import org.apache.paimon.types.DataTypeRoot;
+import org.junit.jupiter.api.Test;
+
+public class TestTableOpsUtils {
+
+  @Test
+  void testAddColumnFirst() {
+    assertTableChange(
+        addColumn(
+            getfieldName("col_1"),
+            IntegerType.get(),
+            AddColumn.class.getSimpleName(),
+            TableChange.ColumnPosition.first(),
+            false,
+            false),
+        AddColumn.class,
+        schemaChange -> {
+          AddColumn addColumn = (AddColumn) schemaChange;
+          assertEquals("col_1", addColumn.fieldName());
+          assertEquals(DataTypeRoot.INTEGER, 
addColumn.dataType().getTypeRoot());
+          assertEquals(AddColumn.class.getSimpleName(), 
addColumn.description());
+          assertNotNull(addColumn.move());
+          assertEquals(MoveType.FIRST, addColumn.move().type());
+          assertEquals("col_1", addColumn.move().fieldName());
+          assertNull(addColumn.move().referenceFieldName());
+          assertFalse(addColumn.dataType().isNullable());
+        });
+  }
+
+  @Test
+  void testAddColumnAfter() {
+    assertTableChange(
+        addColumn(
+            getfieldName("col_2"),
+            FloatType.get(),
+            AddColumn.class.getSimpleName(),
+            after("col_1"),
+            true,
+            false),
+        AddColumn.class,
+        schemaChange -> {
+          AddColumn addColumn = (AddColumn) schemaChange;
+          assertEquals("col_2", addColumn.fieldName());
+          assertEquals(DataTypeRoot.FLOAT, addColumn.dataType().getTypeRoot());
+          assertEquals(AddColumn.class.getSimpleName(), 
addColumn.description());
+          assertNotNull(addColumn.move());
+          assertEquals(MoveType.AFTER, addColumn.move().type());
+          assertEquals("col_2", addColumn.move().fieldName());
+          assertEquals("col_1", addColumn.move().referenceFieldName());
+          assertTrue(addColumn.dataType().isNullable());
+        });
+  }
+
+  @Test
+  void testAddColumnDefaultPosition() {
+    assertTableChange(
+        addColumn(
+            getfieldName("col_3"),
+            ListType.of(StringType.get(), false),
+            AddColumn.class.getSimpleName(),
+            defaultPos(),
+            false,
+            false),
+        AddColumn.class,
+        schemaChange -> {
+          AddColumn addColumn = (AddColumn) schemaChange;
+          assertEquals("col_3", addColumn.fieldName());
+          assertEquals(DataTypeRoot.ARRAY, addColumn.dataType().getTypeRoot());
+          assertEquals(AddColumn.class.getSimpleName(), 
addColumn.description());
+          assertNull(addColumn.move());
+          assertFalse(addColumn.dataType().isNullable());
+        });
+  }
+
+  @Test
+  void testAddColumnWitNullPosition() {
+    assertTableChange(
+        addColumn(
+            getfieldName("col_4"),
+            MapType.of(StringType.get(), IntegerType.get(), true),
+            AddColumn.class.getSimpleName(),
+            null,
+            false,
+            false),
+        AddColumn.class,
+        schemaChange -> {
+          AddColumn addColumn = (AddColumn) schemaChange;
+          assertEquals("col_4", addColumn.fieldName());
+          assertEquals(DataTypeRoot.MAP, addColumn.dataType().getTypeRoot());
+          assertEquals(AddColumn.class.getSimpleName(), 
addColumn.description());
+          assertNull(addColumn.move());
+          assertFalse(addColumn.dataType().isNullable());
+        });
+  }
+
+  @Test
+  void testupdateColumnComment() {
+    assertTableChange(
+        updateColumnComment(getfieldName("col_1"), 
UpdateColumnComment.class.getSimpleName()),
+        UpdateColumnComment.class,
+        schemaChange -> {
+          UpdateColumnComment updateColumnComment = (UpdateColumnComment) 
schemaChange;
+          assertEquals("col_1", 
getfieldName(updateColumnComment.fieldNames()));
+          assertEquals(
+              UpdateColumnComment.class.getSimpleName(), 
updateColumnComment.newDescription());
+        });
+  }
+
+  @Test
+  void testUpdateColumnNullability() {
+    assertTableChange(
+        updateColumnNullability(getfieldName("col_2"), false),
+        UpdateColumnNullability.class,
+        schemaChange -> {
+          UpdateColumnNullability updateColumnNullability = 
(UpdateColumnNullability) schemaChange;
+          assertEquals("col_2", 
getfieldName(updateColumnNullability.fieldNames()));
+          assertFalse(updateColumnNullability.newNullability());
+        });
+  }
+
+  @Test
+  void testUpdateColumnType() {
+    assertTableChange(
+        updateColumnType(getfieldName("col_4"), DoubleType.get()),
+        UpdateColumnType.class,
+        schemaChange -> {
+          UpdateColumnType updateColumnType = (UpdateColumnType) schemaChange;
+          assertEquals("col_4", updateColumnType.fieldName());
+          assertEquals(DataTypeRoot.DOUBLE, 
updateColumnType.newDataType().getTypeRoot());
+        });
+  }
+
+  @Test
+  void testRenameColumn() {
+    assertTableChange(
+        renameColumn(getfieldName("col_1"), "col_5"),
+        RenameColumn.class,
+        schemaChange -> {
+          RenameColumn renameColumn = (RenameColumn) schemaChange;
+          assertEquals("col_1", renameColumn.fieldName());
+          assertEquals("col_5", renameColumn.newName());
+        });
+  }
+
+  @Test
+  void testDeleteColumn() {
+    assertTableChange(
+        deleteColumn(getfieldName("col_2"), true),
+        DropColumn.class,
+        schemaChange -> {
+          DropColumn dropColumn = (DropColumn) schemaChange;
+          assertEquals("col_2", dropColumn.fieldName());
+        });
+  }
+
+  @Test
+  void testUpdateComment() {
+    assertTableChange(
+        updateComment(UpdateComment.class.getSimpleName()),
+        UpdateComment.class,
+        schemaChange -> {
+          UpdateComment updateComment = (UpdateComment) schemaChange;
+          assertEquals(UpdateComment.class.getSimpleName(), 
updateComment.comment());
+        });
+  }
+
+  @Test
+  void testSetProperty() {
+    assertTableChange(
+        setProperty("prop_k1", "prop_v1"),
+        SetOption.class,
+        schemaChange -> {
+          SetOption setOption = (SetOption) schemaChange;
+          assertEquals("prop_k1", setOption.key());
+          assertEquals("prop_v1", setOption.value());
+        });
+  }
+
+  @Test
+  void testRemoveProperty() {
+    assertTableChange(
+        removeProperty("prop_k1"),
+        RemoveOption.class,
+        schemaChange -> {
+          RemoveOption removeOption = (RemoveOption) schemaChange;
+          assertEquals("prop_k1", removeOption.key());
+        });
+  }
+
+  @Test
+  void testUpdateColumnPosition() {
+    assertTableChange(
+        updateColumnPosition(getfieldName("col_3"), after("col_1")),
+        UpdateColumnPosition.class,
+        schemaChange -> {
+          UpdateColumnPosition updateColumnPosition = (UpdateColumnPosition) 
schemaChange;
+          assertEquals("col_3", updateColumnPosition.move().fieldName());
+          assertEquals("col_1", 
updateColumnPosition.move().referenceFieldName());
+        });
+  }
+
+  @Test
+  void testUnsupportedTableChanges() {
+    // Test UnsupportedOperationException with AddIndex, DeleteIndex, 
RenameTable,
+    // UpdateColumnAutoIncrement, UpdateColumnDefaultValue.
+    Arrays.asList(
+            addIndex(IndexType.UNIQUE_KEY, "uk", new String[][] {{"col_5"}}),
+            deleteIndex("uk", true),
+            rename("tb_1"),
+            updateColumnAutoIncrement(getfieldName("col_5"), true),
+            updateColumnDefaultValue(
+                getfieldName("col_5"), Literals.of("default", 
Types.VarCharType.of(255))))
+        .forEach(this::assertUnsupportedTableChange);
+
+    // Test IllegalArgumentException with AddColumn default value and auto 
increment.
+    Arrays.asList(
+            Pair.of(
+                addColumn(
+                    getfieldName("col_1"),
+                    IntegerType.get(),
+                    AddColumn.class.getSimpleName(),
+                    TableChange.ColumnPosition.first(),
+                    false,
+                    false,
+                    Literals.of("default", Types.StringType.get())),
+                "Paimon set column default value through table properties 
instead of column info. Illegal column: col_1."),
+            Pair.of(
+                addColumn(
+                    getfieldName("col_1"),
+                    IntegerType.get(),
+                    AddColumn.class.getSimpleName(),
+                    TableChange.ColumnPosition.first(),
+                    false,
+                    true),
+                "Paimon does not support auto increment column. Illegal 
column: col_1."))
+        .forEach(this::assertIllegalTableChange);
+  }
+
+  private void assertTableChange(
+      TableChange tableChange, Class<?> expected, Consumer<SchemaChange> 
consumer) {
+    SchemaChange schemaChange = buildSchemaChange(tableChange);
+    assertEquals(expected, schemaChange.getClass());
+    consumer.accept(schemaChange);
+  }
+
+  private void assertUnsupportedTableChange(TableChange tableChange) {
+    UnsupportedOperationException exception =
+        assertThrowsExactly(
+            UnsupportedOperationException.class, () -> 
buildSchemaChange(tableChange));
+    assertEquals(
+        String.format(
+            "Paimon does not support %s table change.", 
tableChange.getClass().getSimpleName()),
+        exception.getMessage());
+  }
+
+  private void assertIllegalTableChange(Pair<TableChange, String> tableChange) 
{
+    IllegalArgumentException exception =
+        assertThrowsExactly(
+            IllegalArgumentException.class, () -> 
buildSchemaChange(tableChange.getKey()));
+    assertEquals(tableChange.getValue(), exception.getMessage());
+  }
+}

Reply via email to