FANNG1 commented on code in PR #4097:
URL: https://github.com/apache/gravitino/pull/4097#discussion_r1667878094


##########
flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/catalog/BaseCatalog.java:
##########
@@ -273,10 +275,93 @@ public void createTable(ObjectPath tablePath, 
CatalogBaseTable table, boolean ig
     }
   }
 
+  /**
+   * The method only is used to change the properties and comments. To alter 
columns, use the other
+   * alterTable API and provide a list of TableChange's.
+   *
+   * @param tablePath path of the table or view to be modified
+   * @param newTable the new table definition
+   * @param ignoreIfNotExists flag to specify behavior when the table or view 
does not exist: if set
+   *     to false, throw an exception, if set to true, do nothing.
+   * @throws TableNotExistException if the table not exists.
+   * @throws CatalogException in case of any runtime exception.
+   */
   @Override
-  public void alterTable(ObjectPath objectPath, CatalogBaseTable 
catalogBaseTable, boolean b)
+  public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
       throws TableNotExistException, CatalogException {
-    throw new UnsupportedOperationException();
+    CatalogBaseTable existingTable;
+
+    try {
+      existingTable = this.getTable(tablePath);
+    } catch (TableNotExistException e) {
+      if (!ignoreIfNotExists) {
+        throw e;
+      }
+      return;
+    }
+
+    if (existingTable.getTableKind() != newTable.getTableKind()) {
+      throw new CatalogException(
+          String.format(
+              "Table types don't match. Existing table is '%s' and new table 
is '%s'.",
+              existingTable.getTableKind(), newTable.getTableKind()));
+    }
+
+    List<TableChange> changes = Lists.newArrayList();

Review Comment:
   could you provide a func like `getTableChanges` to wrap it and add 
corresponding UT?



##########
flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/catalog/BaseCatalog.java:
##########
@@ -273,10 +275,93 @@ public void createTable(ObjectPath tablePath, 
CatalogBaseTable table, boolean ig
     }
   }
 
+  /**
+   * The method only is used to change the properties and comments. To alter 
columns, use the other
+   * alterTable API and provide a list of TableChange's.
+   *
+   * @param tablePath path of the table or view to be modified
+   * @param newTable the new table definition
+   * @param ignoreIfNotExists flag to specify behavior when the table or view 
does not exist: if set
+   *     to false, throw an exception, if set to true, do nothing.
+   * @throws TableNotExistException if the table not exists.
+   * @throws CatalogException in case of any runtime exception.
+   */
   @Override
-  public void alterTable(ObjectPath objectPath, CatalogBaseTable 
catalogBaseTable, boolean b)
+  public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
       throws TableNotExistException, CatalogException {
-    throw new UnsupportedOperationException();
+    CatalogBaseTable existingTable;
+
+    try {
+      existingTable = this.getTable(tablePath);
+    } catch (TableNotExistException e) {
+      if (!ignoreIfNotExists) {
+        throw e;
+      }
+      return;
+    }
+
+    if (existingTable.getTableKind() != newTable.getTableKind()) {
+      throw new CatalogException(
+          String.format(
+              "Table types don't match. Existing table is '%s' and new table 
is '%s'.",
+              existingTable.getTableKind(), newTable.getTableKind()));
+    }
+
+    List<TableChange> changes = Lists.newArrayList();
+    changes.addAll(optionsChanges(existingTable.getOptions(), 
newTable.getOptions()));
+    if (!Objects.equals(newTable.getComment(), existingTable.getComment())) {
+      changes.add(TableChange.updateComment(newTable.getComment()));
+    }
+
+    NameIdentifier identifier =
+        NameIdentifier.of(tablePath.getDatabaseName(), 
tablePath.getObjectName());
+    catalog().asTableCatalog().alterTable(identifier, changes.toArray(new 
TableChange[0]));
+  }
+
+  @Override
+  public void alterTable(
+      ObjectPath tablePath,
+      CatalogBaseTable newTable,
+      List<org.apache.flink.table.catalog.TableChange> tableChanges,
+      boolean ignoreIfNotExists)
+      throws TableNotExistException, CatalogException {
+    CatalogBaseTable existingTable;
+    try {
+      existingTable = this.getTable(tablePath);
+    } catch (TableNotExistException e) {
+      if (!ignoreIfNotExists) {
+        throw e;
+      }
+      return;
+    }
+
+    if (existingTable.getTableKind() != newTable.getTableKind()) {
+      throw new CatalogException(
+          String.format(
+              "Table types don't match. Existing table is '%s' and new table 
is '%s'.",
+              existingTable.getTableKind(), newTable.getTableKind()));
+    }
+
+    NameIdentifier identifier =

Review Comment:
   could you provide a func like `getTableChanges` to wrap it and add 
corresponding UT?



##########
flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java:
##########
@@ -281,6 +304,215 @@ public void testGetCatalogFromGravitino() {
         numCatalogs, tableEnv.listCatalogs().length, "The created catalog 
should be dropped.");
   }
 
+  @Test
+  public void testRenameColumn() {

Review Comment:
   the logic is general, better to place it in `FlinkCommonIT`?



##########
flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/catalog/BaseCatalog.java:
##########
@@ -273,10 +275,93 @@ public void createTable(ObjectPath tablePath, 
CatalogBaseTable table, boolean ig
     }
   }
 
+  /**
+   * The method only is used to change the properties and comments. To alter 
columns, use the other
+   * alterTable API and provide a list of TableChange's.
+   *
+   * @param tablePath path of the table or view to be modified
+   * @param newTable the new table definition
+   * @param ignoreIfNotExists flag to specify behavior when the table or view 
does not exist: if set
+   *     to false, throw an exception, if set to true, do nothing.
+   * @throws TableNotExistException if the table not exists.
+   * @throws CatalogException in case of any runtime exception.
+   */
   @Override
-  public void alterTable(ObjectPath objectPath, CatalogBaseTable 
catalogBaseTable, boolean b)
+  public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
       throws TableNotExistException, CatalogException {
-    throw new UnsupportedOperationException();
+    CatalogBaseTable existingTable;
+
+    try {

Review Comment:
   seems no need to loadTable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to