This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 3448dca2c [#3981]feat(flink-connector):Support alter table operation 
for hive (#4097)
3448dca2c is described below

commit 3448dca2c8394b83214be79cc8d9f8f925095af6
Author: Peidian li <[email protected]>
AuthorDate: Wed Jul 24 21:05:46 2024 +0800

    [#3981]feat(flink-connector):Support alter table operation for hive (#4097)
    
    ### What changes were proposed in this pull request?
    
    - Support alter table operation for hive table.
    
    ### Why are the changes needed?
    
    - Fix: #3981
    
    ### Does this PR introduce _any_ user-facing change?
    
    - no
    
    ### How was this patch tested?
    
    - add UTs
---
 .../flink/connector/catalog/BaseCatalog.java       | 167 +++++++++++-
 .../flink/connector/utils/TableUtils.java          |  44 ++++
 .../gravitino/flink/connector/utils/TypeUtils.java |   4 +
 .../flink/connector/catalog/TestBaseCatalog.java   |  67 +++++
 .../connector/integration/test/FlinkCommonIT.java  | 281 ++++++++++++++++++++-
 .../flink/connector/utils/TestTypeUtils.java       |   3 +
 6 files changed, 563 insertions(+), 3 deletions(-)

diff --git 
a/flink-connector/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java
 
b/flink-connector/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java
index 7fdc8f108..6b76e31b8 100644
--- 
a/flink-connector/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java
+++ 
b/flink-connector/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java
@@ -20,11 +20,13 @@
 package org.apache.gravitino.flink.connector.catalog;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.MapDifference;
 import com.google.common.collect.Maps;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.commons.compress.utils.Lists;
@@ -68,6 +70,7 @@ import 
org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
 import org.apache.gravitino.exceptions.TableAlreadyExistsException;
 import org.apache.gravitino.flink.connector.PartitionConverter;
 import org.apache.gravitino.flink.connector.PropertiesConverter;
+import org.apache.gravitino.flink.connector.utils.TableUtils;
 import org.apache.gravitino.flink.connector.utils.TypeUtils;
 import org.apache.gravitino.rel.Column;
 import org.apache.gravitino.rel.Table;
@@ -254,6 +257,8 @@ public abstract class BaseCatalog extends AbstractCatalog {
   @Override
   public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
       throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
+    Preconditions.checkArgument(
+        table instanceof ResolvedCatalogBaseTable, "table should be resolved");
     NameIdentifier identifier =
         NameIdentifier.of(tablePath.getDatabaseName(), 
tablePath.getObjectName());
 
@@ -280,10 +285,72 @@ public abstract class BaseCatalog extends AbstractCatalog 
{
     }
   }
 
+  /**
+   * The method only is used to change the comments. To alter columns, use the 
other alterTable API
+   * and provide a list of TableChanges.
+   *
+   * @param tablePath path of the table or view to be modified
+   * @param newTable the new table definition
+   * @param ignoreIfNotExists flag to specify behavior when the table or view 
does not exist: if set
+   *     to false, throw an exception, if set to true, do nothing.
+   * @throws TableNotExistException if the table not exists.
+   * @throws CatalogException in case of any runtime exception.
+   */
   @Override
-  public void alterTable(ObjectPath objectPath, CatalogBaseTable 
catalogBaseTable, boolean b)
+  public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
       throws TableNotExistException, CatalogException {
-    throw new UnsupportedOperationException();
+    CatalogBaseTable existingTable;
+
+    try {
+      existingTable = this.getTable(tablePath);
+    } catch (TableNotExistException e) {
+      if (!ignoreIfNotExists) {
+        throw e;
+      }
+      return;
+    }
+
+    if (existingTable.getTableKind() != newTable.getTableKind()) {
+      throw new CatalogException(
+          String.format(
+              "Table types don't match. Existing table is '%s' and new table 
is '%s'.",
+              existingTable.getTableKind(), newTable.getTableKind()));
+    }
+
+    NameIdentifier identifier =
+        NameIdentifier.of(tablePath.getDatabaseName(), 
tablePath.getObjectName());
+    catalog()
+        .asTableCatalog()
+        .alterTable(identifier, getGravitinoTableChanges(existingTable, 
newTable));
+  }
+
+  @Override
+  public void alterTable(
+      ObjectPath tablePath,
+      CatalogBaseTable newTable,
+      List<org.apache.flink.table.catalog.TableChange> tableChanges,
+      boolean ignoreIfNotExists)
+      throws TableNotExistException, CatalogException {
+    CatalogBaseTable existingTable;
+    try {
+      existingTable = this.getTable(tablePath);
+    } catch (TableNotExistException e) {
+      if (!ignoreIfNotExists) {
+        throw e;
+      }
+      return;
+    }
+
+    if (existingTable.getTableKind() != newTable.getTableKind()) {
+      throw new CatalogException(
+          String.format(
+              "Table types don't match. Existing table is '%s' and new table 
is '%s'.",
+              existingTable.getTableKind(), newTable.getTableKind()));
+    }
+
+    NameIdentifier identifier =
+        NameIdentifier.of(tablePath.getDatabaseName(), 
tablePath.getObjectName());
+    catalog().asTableCatalog().alterTable(identifier, 
getGravitinoTableChanges(tableChanges));
   }
 
   @Override
@@ -470,6 +537,102 @@ public abstract class BaseCatalog extends AbstractCatalog 
{
         null);
   }
 
+  private static void removeProperty(
+      org.apache.flink.table.catalog.TableChange.ResetOption change, 
List<TableChange> changes) {
+    changes.add(TableChange.removeProperty(change.getKey()));
+  }
+
+  private static void setProperty(
+      org.apache.flink.table.catalog.TableChange.SetOption change, 
List<TableChange> changes) {
+    changes.add(TableChange.setProperty(change.getKey(), change.getValue()));
+  }
+
+  private static void dropColumn(
+      org.apache.flink.table.catalog.TableChange.DropColumn change, 
List<TableChange> changes) {
+    changes.add(TableChange.deleteColumn(new String[] 
{change.getColumnName()}, true));
+  }
+
+  private static void addColumn(
+      org.apache.flink.table.catalog.TableChange.AddColumn change, 
List<TableChange> changes) {
+    changes.add(
+        TableChange.addColumn(
+            new String[] {change.getColumn().getName()},
+            
TypeUtils.toGravitinoType(change.getColumn().getDataType().getLogicalType()),
+            change.getColumn().getComment().orElse(null),
+            TableUtils.toGravitinoColumnPosition(change.getPosition())));
+  }
+
+  private static void modifyColumn(
+      org.apache.flink.table.catalog.TableChange change, List<TableChange> 
changes) {
+    if (change instanceof 
org.apache.flink.table.catalog.TableChange.ModifyColumnName) {
+      org.apache.flink.table.catalog.TableChange.ModifyColumnName 
modifyColumnName =
+          (org.apache.flink.table.catalog.TableChange.ModifyColumnName) change;
+      changes.add(
+          TableChange.renameColumn(
+              new String[] {modifyColumnName.getOldColumnName()},
+              modifyColumnName.getNewColumnName()));
+    } else if (change
+        instanceof 
org.apache.flink.table.catalog.TableChange.ModifyPhysicalColumnType) {
+      org.apache.flink.table.catalog.TableChange.ModifyPhysicalColumnType 
modifyColumnType =
+          
(org.apache.flink.table.catalog.TableChange.ModifyPhysicalColumnType) change;
+      changes.add(
+          TableChange.updateColumnType(
+              new String[] {modifyColumnType.getOldColumn().getName()},
+              
TypeUtils.toGravitinoType(modifyColumnType.getNewType().getLogicalType())));
+    } else if (change instanceof 
org.apache.flink.table.catalog.TableChange.ModifyColumnPosition) {
+      org.apache.flink.table.catalog.TableChange.ModifyColumnPosition 
modifyColumnPosition =
+          (org.apache.flink.table.catalog.TableChange.ModifyColumnPosition) 
change;
+      changes.add(
+          TableChange.updateColumnPosition(
+              new String[] {modifyColumnPosition.getOldColumn().getName()},
+              
TableUtils.toGravitinoColumnPosition(modifyColumnPosition.getNewPosition())));
+    } else if (change instanceof 
org.apache.flink.table.catalog.TableChange.ModifyColumnComment) {
+      org.apache.flink.table.catalog.TableChange.ModifyColumnComment 
modifyColumnComment =
+          (org.apache.flink.table.catalog.TableChange.ModifyColumnComment) 
change;
+      changes.add(
+          TableChange.updateColumnComment(
+              new String[] {modifyColumnComment.getOldColumn().getName()},
+              modifyColumnComment.getNewComment()));
+    } else {
+      throw new IllegalArgumentException(
+          String.format("Not support ModifyColumn : %s", change.getClass()));
+    }
+  }
+
+  @VisibleForTesting
+  static TableChange[] getGravitinoTableChanges(
+      CatalogBaseTable existingTable, CatalogBaseTable newTable) {
+    Preconditions.checkNotNull(newTable.getComment(), "The new comment should 
not be null");
+    List<TableChange> changes = Lists.newArrayList();
+    if (!Objects.equals(newTable.getComment(), existingTable.getComment())) {
+      changes.add(TableChange.updateComment(newTable.getComment()));
+    }
+    return changes.toArray(new TableChange[0]);
+  }
+
+  @VisibleForTesting
+  static TableChange[] getGravitinoTableChanges(
+      List<org.apache.flink.table.catalog.TableChange> tableChanges) {
+    List<TableChange> changes = Lists.newArrayList();
+    for (org.apache.flink.table.catalog.TableChange change : tableChanges) {
+      if (change instanceof 
org.apache.flink.table.catalog.TableChange.AddColumn) {
+        addColumn((org.apache.flink.table.catalog.TableChange.AddColumn) 
change, changes);
+      } else if (change instanceof 
org.apache.flink.table.catalog.TableChange.DropColumn) {
+        dropColumn((org.apache.flink.table.catalog.TableChange.DropColumn) 
change, changes);
+      } else if (change instanceof 
org.apache.flink.table.catalog.TableChange.ModifyColumn) {
+        modifyColumn(change, changes);
+      } else if (change instanceof 
org.apache.flink.table.catalog.TableChange.SetOption) {
+        setProperty((org.apache.flink.table.catalog.TableChange.SetOption) 
change, changes);
+      } else if (change instanceof 
org.apache.flink.table.catalog.TableChange.ResetOption) {
+        
removeProperty((org.apache.flink.table.catalog.TableChange.ResetOption) change, 
changes);
+      } else {
+        throw new UnsupportedOperationException(
+            String.format("Not supported change : %s", change.getClass()));
+      }
+    }
+    return changes.toArray(new TableChange[0]);
+  }
+
   @VisibleForTesting
   static SchemaChange[] getSchemaChange(CatalogDatabase current, 
CatalogDatabase updated) {
     Map<String, String> currentProperties = current.getProperties();
diff --git 
a/flink-connector/src/main/java/org/apache/gravitino/flink/connector/utils/TableUtils.java
 
b/flink-connector/src/main/java/org/apache/gravitino/flink/connector/utils/TableUtils.java
new file mode 100644
index 000000000..68bae4180
--- /dev/null
+++ 
b/flink-connector/src/main/java/org/apache/gravitino/flink/connector/utils/TableUtils.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.flink.connector.utils;
+
+import org.apache.gravitino.rel.TableChange;
+
+public class TableUtils {
+  private TableUtils() {}
+
+  public static TableChange.ColumnPosition toGravitinoColumnPosition(
+      org.apache.flink.table.catalog.TableChange.ColumnPosition 
columnPosition) {
+    if (columnPosition == null) {
+      return null;
+    }
+
+    if (columnPosition instanceof 
org.apache.flink.table.catalog.TableChange.First) {
+      return TableChange.ColumnPosition.first();
+    } else if (columnPosition instanceof 
org.apache.flink.table.catalog.TableChange.After) {
+      org.apache.flink.table.catalog.TableChange.After after =
+          (org.apache.flink.table.catalog.TableChange.After) columnPosition;
+      return TableChange.ColumnPosition.after(after.column());
+    } else {
+      throw new IllegalArgumentException(
+          String.format("Not support column position : %s", 
columnPosition.getClass()));
+    }
+  }
+}
diff --git 
a/flink-connector/src/main/java/org/apache/gravitino/flink/connector/utils/TypeUtils.java
 
b/flink-connector/src/main/java/org/apache/gravitino/flink/connector/utils/TypeUtils.java
index a11e20cb3..7c5635037 100644
--- 
a/flink-connector/src/main/java/org/apache/gravitino/flink/connector/utils/TypeUtils.java
+++ 
b/flink-connector/src/main/java/org/apache/gravitino/flink/connector/utils/TypeUtils.java
@@ -37,6 +37,8 @@ public class TypeUtils {
         return Types.DoubleType.get();
       case INTEGER:
         return Types.IntegerType.get();
+      case BIGINT:
+        return Types.LongType.get();
       default:
         throw new UnsupportedOperationException(
             "Not support type: " + logicalType.asSummaryString());
@@ -51,6 +53,8 @@ public class TypeUtils {
         return DataTypes.STRING();
       case INTEGER:
         return DataTypes.INT();
+      case LONG:
+        return DataTypes.BIGINT();
       default:
         throw new UnsupportedOperationException("Not support " + 
gravitinoType.toString());
     }
diff --git 
a/flink-connector/src/test/java/org/apache/gravitino/flink/connector/catalog/TestBaseCatalog.java
 
b/flink-connector/src/test/java/org/apache/gravitino/flink/connector/catalog/TestBaseCatalog.java
index a016a9603..89d25f8bd 100644
--- 
a/flink-connector/src/test/java/org/apache/gravitino/flink/connector/catalog/TestBaseCatalog.java
+++ 
b/flink-connector/src/test/java/org/apache/gravitino/flink/connector/catalog/TestBaseCatalog.java
@@ -18,11 +18,20 @@
  */
 package org.apache.gravitino.flink.connector.catalog;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import java.util.List;
 import java.util.Map;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
 import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.TableChange;
 import org.apache.gravitino.SchemaChange;
+import org.apache.gravitino.rel.types.Types;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -49,4 +58,62 @@ public class TestBaseCatalog {
     Assertions.assertEquals("key2", ((SchemaChange.SetProperty) 
schemaChange[2]).getProperty());
     Assertions.assertEquals("new-value2", ((SchemaChange.SetProperty) 
schemaChange[2]).getValue());
   }
+
+  @Test
+  public void testTableChanges() {
+    List<TableChange> tableChanges =
+        ImmutableList.of(
+            TableChange.add(Column.physical("test", DataTypes.INT())),
+            TableChange.modifyPhysicalColumnType(
+                Column.physical("test", DataTypes.INT()), DataTypes.DOUBLE()),
+            TableChange.modifyColumnName(Column.physical("test", 
DataTypes.INT()), "test2"),
+            TableChange.dropColumn("aaa"),
+            TableChange.modifyColumnComment(
+                Column.physical("test", DataTypes.INT()), "new comment"),
+            TableChange.modifyColumnPosition(
+                Column.physical("test", DataTypes.INT()),
+                TableChange.ColumnPosition.after("test2")),
+            TableChange.modifyColumnPosition(
+                Column.physical("test", DataTypes.INT()), 
TableChange.ColumnPosition.first()),
+            TableChange.set("key", "value"),
+            TableChange.reset("key"));
+
+    List<org.apache.gravitino.rel.TableChange> expected =
+        ImmutableList.of(
+            org.apache.gravitino.rel.TableChange.addColumn(
+                new String[] {"test"}, Types.IntegerType.get()),
+            org.apache.gravitino.rel.TableChange.updateColumnType(
+                new String[] {"test"}, Types.DoubleType.get()),
+            org.apache.gravitino.rel.TableChange.renameColumn(new String[] 
{"test"}, "test2"),
+            org.apache.gravitino.rel.TableChange.deleteColumn(new String[] 
{"aaa"}, true),
+            org.apache.gravitino.rel.TableChange.updateColumnComment(
+                new String[] {"test"}, "new comment"),
+            org.apache.gravitino.rel.TableChange.updateColumnPosition(
+                new String[] {"test"},
+                
org.apache.gravitino.rel.TableChange.ColumnPosition.after("test2")),
+            org.apache.gravitino.rel.TableChange.updateColumnPosition(
+                new String[] {"test"}, 
org.apache.gravitino.rel.TableChange.ColumnPosition.first()),
+            org.apache.gravitino.rel.TableChange.setProperty("key", "value"),
+            org.apache.gravitino.rel.TableChange.removeProperty("key"));
+
+    org.apache.gravitino.rel.TableChange[] gravitinoTableChanges =
+        BaseCatalog.getGravitinoTableChanges(tableChanges);
+    Assertions.assertArrayEquals(expected.toArray(), gravitinoTableChanges);
+  }
+
+  @Test
+  public void testTableChangesWithoutColumnChange() {
+    Schema schema = Schema.newBuilder().column("test", "INT").build();
+    CatalogBaseTable table =
+        CatalogTable.of(
+            schema, "test", ImmutableList.of(), ImmutableMap.of("key", 
"value", "key2", "value2"));
+    CatalogBaseTable newTable =
+        CatalogTable.of(
+            schema, "new comment", ImmutableList.of(), ImmutableMap.of("key", 
"new value"));
+    org.apache.gravitino.rel.TableChange[] tableChanges =
+        BaseCatalog.getGravitinoTableChanges(table, newTable);
+    List<org.apache.gravitino.rel.TableChange> expected =
+        
ImmutableList.of(org.apache.gravitino.rel.TableChange.updateComment("new 
comment"));
+    Assertions.assertArrayEquals(expected.toArray(), tableChanges);
+  }
 }
diff --git 
a/flink-connector/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java
 
b/flink-connector/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java
index 06905fff9..46b1d1d23 100644
--- 
a/flink-connector/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java
+++ 
b/flink-connector/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java
@@ -22,15 +22,25 @@ package 
org.apache.gravitino.flink.connector.integration.test;
 import static 
org.apache.gravitino.flink.connector.integration.test.utils.TestUtils.assertColumns;
 import static 
org.apache.gravitino.flink.connector.integration.test.utils.TestUtils.toFlinkPhysicalColumn;
 import static 
org.apache.gravitino.rel.expressions.transforms.Transforms.EMPTY_TRANSFORM;
+import static org.junit.jupiter.api.Assertions.fail;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import org.apache.commons.compress.utils.Lists;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.ResultKind;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.types.Row;
 import org.apache.gravitino.Catalog;
@@ -325,9 +335,278 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
             CatalogTable catalogTable = (CatalogTable) table;
             Assertions.assertFalse(catalogTable.isPartitioned());
           } catch (TableNotExistException e) {
-            Assertions.fail(e);
+            fail(e);
           }
         },
         true);
   }
+
+  @Test
+  public void testRenameColumn() {
+    String databaseName = "test_renam_column_db";
+    String tableName = "test_rename_column";
+    doWithSchema(
+        currentCatalog(),
+        databaseName,
+        catalog -> {
+          TableResult result =
+              sql(
+                  "CREATE TABLE %s "
+                      + "(user_id INT COMMENT 'USER_ID', "
+                      + " order_amount DOUBLE COMMENT 'ORDER_AMOUNT')"
+                      + " COMMENT 'test comment'",
+                  tableName);
+          TestUtils.assertTableResult(result, ResultKind.SUCCESS);
+
+          result = sql("ALTER TABLE %s RENAME user_id TO user_id_new", 
tableName);
+          TestUtils.assertTableResult(result, ResultKind.SUCCESS);
+
+          Column[] actual =
+              catalog
+                  .asTableCatalog()
+                  .loadTable(NameIdentifier.of(databaseName, tableName))
+                  .columns();
+          Column[] expected =
+              new Column[] {
+                Column.of("user_id_new", Types.IntegerType.get(), "USER_ID"),
+                Column.of("order_amount", Types.DoubleType.get(), 
"ORDER_AMOUNT")
+              };
+          assertColumns(expected, actual);
+        },
+        true);
+  }
+
+  @Test
+  public void testAlterTableComment() {
+    String databaseName = "test_alter_table_comment_database";
+    String tableName = "test_alter_table_comment";
+    String newComment = "new_table_comment";
+    doWithSchema(
+        currentCatalog(),
+        databaseName,
+        catalog -> {
+          Optional<org.apache.flink.table.catalog.Catalog> flinkCatalog =
+              tableEnv.getCatalog(currentCatalog().name());
+          if (flinkCatalog.isPresent()) {
+            org.apache.flink.table.catalog.Catalog currentFlinkCatalog = 
flinkCatalog.get();
+            ObjectPath currentTablePath = new ObjectPath(databaseName, 
tableName);
+            try {
+              // use java api to create a new table
+              org.apache.flink.table.api.Schema schema =
+                  org.apache.flink.table.api.Schema.newBuilder()
+                      .column("test", DataTypes.INT())
+                      .build();
+              CatalogTable newTable =
+                  CatalogTable.of(schema, "test comment", ImmutableList.of(), 
ImmutableMap.of());
+              List<org.apache.flink.table.catalog.Column> columns = 
Lists.newArrayList();
+              
columns.add(org.apache.flink.table.catalog.Column.physical("test", 
DataTypes.INT()));
+              ResolvedSchema resolvedSchema = new ResolvedSchema(columns, new 
ArrayList<>(), null);
+              currentFlinkCatalog.createTable(
+                  currentTablePath, new ResolvedCatalogTable(newTable, 
resolvedSchema), false);
+              CatalogTable table = (CatalogTable) 
currentFlinkCatalog.getTable(currentTablePath);
+
+              // alter table comment
+              currentFlinkCatalog.alterTable(
+                  currentTablePath,
+                  CatalogTable.of(
+                      table.getUnresolvedSchema(),
+                      newComment,
+                      table.getPartitionKeys(),
+                      table.getOptions()),
+                  false);
+
+              CatalogTable loadedTable =
+                  (CatalogTable) 
currentFlinkCatalog.getTable(currentTablePath);
+              Assertions.assertEquals(newComment, loadedTable.getComment());
+              Table gravitinoTable =
+                  currentCatalog()
+                      .asTableCatalog()
+                      .loadTable(NameIdentifier.of(databaseName, tableName));
+              Assertions.assertEquals(newComment, gravitinoTable.comment());
+            } catch (DatabaseNotExistException
+                | TableAlreadyExistException
+                | TableNotExistException e) {
+              fail(e);
+            }
+          } else {
+            fail("Catalog doesn't exist");
+          }
+        },
+        true);
+  }
+
+  @Test
+  public void testAlterTableAddColumn() {
+    String databaseName = "test_alter_table_add_column_db";
+    String tableName = "test_alter_table_add_column";
+    doWithSchema(
+        currentCatalog(),
+        databaseName,
+        catalog -> {
+          TableResult result =
+              sql(
+                  "CREATE TABLE %s "
+                      + "(user_id INT COMMENT 'USER_ID', "
+                      + " order_amount INT COMMENT 'ORDER_AMOUNT')"
+                      + " COMMENT 'test comment'",
+                  tableName);
+          TestUtils.assertTableResult(result, ResultKind.SUCCESS);
+          result = sql("ALTER TABLE %s ADD new_column_2 INT AFTER 
order_amount", tableName);
+          TestUtils.assertTableResult(result, ResultKind.SUCCESS);
+
+          Column[] actual =
+              catalog
+                  .asTableCatalog()
+                  .loadTable(NameIdentifier.of(databaseName, tableName))
+                  .columns();
+          Column[] expected =
+              new Column[] {
+                Column.of("user_id", Types.IntegerType.get(), "USER_ID"),
+                Column.of("order_amount", Types.IntegerType.get(), 
"ORDER_AMOUNT"),
+                Column.of("new_column_2", Types.IntegerType.get(), null),
+              };
+          assertColumns(expected, actual);
+        },
+        true);
+  }
+
+  @Test
+  public void testAlterTableDropColumn() {
+    String databaseName = "test_alter_table_drop_column_db";
+    String tableName = "test_alter_table_drop_column";
+    doWithSchema(
+        currentCatalog(),
+        databaseName,
+        catalog -> {
+          TableResult result =
+              sql(
+                  "CREATE TABLE %s "
+                      + "(user_id INT COMMENT 'USER_ID', "
+                      + " order_amount INT COMMENT 'ORDER_AMOUNT')"
+                      + " COMMENT 'test comment'",
+                  tableName);
+          TestUtils.assertTableResult(result, ResultKind.SUCCESS);
+          result = sql("ALTER TABLE %s DROP user_id", tableName);
+          TestUtils.assertTableResult(result, ResultKind.SUCCESS);
+          Column[] actual =
+              catalog
+                  .asTableCatalog()
+                  .loadTable(NameIdentifier.of(databaseName, tableName))
+                  .columns();
+          Column[] expected =
+              new Column[] {Column.of("order_amount", Types.IntegerType.get(), 
"ORDER_AMOUNT")};
+          assertColumns(expected, actual);
+        },
+        true);
+  }
+
+  @Test
+  public void testAlterColumnTypeAndChangeOrder() {
+    String databaseName = "test_alter_table_alter_column_db";
+    String tableName = "test_alter_table_rename_column";
+    doWithSchema(
+        currentCatalog(),
+        databaseName,
+        catalog -> {
+          TableResult result =
+              sql(
+                  "CREATE TABLE %s "
+                      + "(user_id BIGINT COMMENT 'USER_ID', "
+                      + " order_amount INT COMMENT 'ORDER_AMOUNT')"
+                      + " COMMENT 'test comment'"
+                      + " WITH ("
+                      + "'%s' = '%s')",
+                  tableName, "test key", "test value");
+          TestUtils.assertTableResult(result, ResultKind.SUCCESS);
+          result =
+              sql("ALTER TABLE %s MODIFY order_amount BIGINT COMMENT 'new 
comment2'", tableName);
+          TestUtils.assertTableResult(result, ResultKind.SUCCESS);
+          result =
+              sql(
+                  "ALTER TABLE %s MODIFY user_id BIGINT COMMENT 'new comment' 
AFTER order_amount",
+                  tableName);
+          TestUtils.assertTableResult(result, ResultKind.SUCCESS);
+          Column[] actual =
+              catalog
+                  .asTableCatalog()
+                  .loadTable(NameIdentifier.of(databaseName, tableName))
+                  .columns();
+          Column[] expected =
+              new Column[] {
+                Column.of("order_amount", Types.LongType.get(), "new 
comment2"),
+                Column.of("user_id", Types.LongType.get(), "new comment")
+              };
+          assertColumns(expected, actual);
+        },
+        true);
+  }
+
+  @Test
+  public void testRenameTable() {
+    String databaseName = "test_rename_table_db";
+    String tableName = "test_rename_table";
+    doWithSchema(
+        currentCatalog(),
+        databaseName,
+        catalog -> {
+          TableResult result =
+              sql(
+                  "CREATE TABLE %s "
+                      + "(user_id INT COMMENT 'USER_ID', "
+                      + " order_amount INT COMMENT 'ORDER_AMOUNT')"
+                      + " COMMENT 'test comment'",
+                  tableName);
+          TestUtils.assertTableResult(result, ResultKind.SUCCESS);
+          String newTableName = "new_rename_table_name";
+          result = sql("ALTER TABLE %s RENAME TO %s", tableName, newTableName);
+          TestUtils.assertTableResult(result, ResultKind.SUCCESS);
+          Assertions.assertFalse(
+              
catalog.asTableCatalog().tableExists(NameIdentifier.of(databaseName, 
tableName)));
+          Assertions.assertTrue(
+              
catalog.asTableCatalog().tableExists(NameIdentifier.of(databaseName, 
newTableName)));
+        },
+        true);
+  }
+
+  @Test
+  public void testAlterTableProperties() {
+    String databaseName = "test_alter_table_properties_db";
+    String tableName = "test_alter_table_properties";
+    doWithSchema(
+        currentCatalog(),
+        databaseName,
+        catalog -> {
+          TableResult result =
+              sql(
+                  "CREATE TABLE %s "
+                      + "(user_id INT COMMENT 'USER_ID', "
+                      + " order_amount INT COMMENT 'ORDER_AMOUNT')"
+                      + " COMMENT 'test comment'"
+                      + " WITH ("
+                      + "'%s' = '%s')",
+                  tableName, "key", "value");
+          TestUtils.assertTableResult(result, ResultKind.SUCCESS);
+          result = sql("ALTER TABLE %s SET ('key2' = 'value2', 'key' = 
'value1')", tableName);
+          TestUtils.assertTableResult(result, ResultKind.SUCCESS);
+          Map<String, String> properties =
+              catalog
+                  .asTableCatalog()
+                  .loadTable(NameIdentifier.of(databaseName, tableName))
+                  .properties();
+
+          Assertions.assertEquals("value1", properties.get("key"));
+          Assertions.assertEquals("value2", properties.get("key2"));
+          result = sql("ALTER TABLE %s RESET ('key2')", tableName);
+          TestUtils.assertTableResult(result, ResultKind.SUCCESS);
+
+          properties =
+              catalog
+                  .asTableCatalog()
+                  .loadTable(NameIdentifier.of(databaseName, tableName))
+                  .properties();
+          Assertions.assertEquals("value1", properties.get("key"));
+          Assertions.assertNull(properties.get("key2"));
+        },
+        true);
+  }
 }
diff --git 
a/flink-connector/src/test/java/org/apache/gravitino/flink/connector/utils/TestTypeUtils.java
 
b/flink-connector/src/test/java/org/apache/gravitino/flink/connector/utils/TestTypeUtils.java
index e6d78f114..e9b1f8343 100644
--- 
a/flink-connector/src/test/java/org/apache/gravitino/flink/connector/utils/TestTypeUtils.java
+++ 
b/flink-connector/src/test/java/org/apache/gravitino/flink/connector/utils/TestTypeUtils.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.flink.connector.utils;
 
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.DoubleType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.UnresolvedUserDefinedType;
@@ -37,6 +38,7 @@ public class TestTypeUtils {
         Types.StringType.get(), TypeUtils.toGravitinoType(new 
VarCharType(Integer.MAX_VALUE)));
     Assertions.assertEquals(Types.DoubleType.get(), 
TypeUtils.toGravitinoType(new DoubleType()));
     Assertions.assertEquals(Types.IntegerType.get(), 
TypeUtils.toGravitinoType(new IntType()));
+    Assertions.assertEquals(Types.LongType.get(), 
TypeUtils.toGravitinoType(new BigIntType()));
     Assertions.assertThrows(
         UnsupportedOperationException.class,
         () ->
@@ -49,6 +51,7 @@ public class TestTypeUtils {
     Assertions.assertEquals(DataTypes.DOUBLE(), 
TypeUtils.toFlinkType(Types.DoubleType.get()));
     Assertions.assertEquals(DataTypes.STRING(), 
TypeUtils.toFlinkType(Types.StringType.get()));
     Assertions.assertEquals(DataTypes.INT(), 
TypeUtils.toFlinkType(Types.IntegerType.get()));
+    Assertions.assertEquals(DataTypes.BIGINT(), 
TypeUtils.toFlinkType(Types.LongType.get()));
     Assertions.assertThrows(
         UnsupportedOperationException.class,
         () -> TypeUtils.toFlinkType(Types.UnparsedType.of("unknown")));

Reply via email to