Copilot commented on code in PR #9689:
URL: https://github.com/apache/gravitino/pull/9689#discussion_r2747561230


##########
docs/flink-connector/flink-catalog-hive.md:
##########
@@ -23,14 +23,32 @@ Supports most DDL and DML operations in Flink SQL, except 
such operations:
 - `DELETE` clause
 - `CALL` clause
 
+## Generic table support
+
+Flink generic tables are non-Hive tables. Their schema and partition keys are 
stored in table
+properties in Hive metastore. Gravitino Flink connector follows the Flink Hive 
catalog behavior:
+
+- If `connector=hive`, the table is treated as a Hive table and stored with a 
normal Hive schema.
+- If the connector is missing or not `hive`, the table is treated as a generic 
table. Gravitino
+  stores an empty Hive schema and serializes schema and partition keys into 
`flink.*` properties.
+  It also uses the `is_generic` flag when needed for compatibility.
+
+When loading or altering a table, Gravitino Flink connector detects generic 
tables by the
+`is_generic`, `flink.connector`, and `flink.connector.type` properties. 
Generic tables are
+reconstructed from the serialized `flink.*` properties. Hive tables continue 
to use the native
+Hive schema.
+
+:::note
+You must set `connector=hive` explicitly when creating a raw Hive table. 
Otherwise, the table is created as a managed generic table. The managed table 
is not recommended to use and is deprecated in Flink.

Review Comment:
   The documentation states "The managed table is not recommended to use and is 
deprecated in Flink," but this statement lacks clarity and context. It would be 
more helpful to:
   
   1. Specify which version of Flink deprecated managed tables
   2. Clarify what users should use instead
   3. Add a reference/link to the relevant Flink documentation about this 
deprecation
   
   This helps users understand the implications and make informed decisions.
   ```suggestion
   You must set `connector=hive` explicitly when creating a raw Hive table. 
Otherwise, the table is created as a managed generic table. Starting from 
Apache Flink 1.15, managed generic tables in the Hive catalog are deprecated 
and should be avoided. Instead, use external generic tables (by specifying an 
explicit connector) or native Hive tables. For more details, see the Flink 
documentation on Hive generic tables: 
https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/hive/hive_catalog/#generic-tables
   ```



##########
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalog.java:
##########
@@ -67,4 +92,155 @@ public Optional<Factory> getFactory() {
   protected AbstractCatalog realCatalog() {
     return hiveCatalog;
   }
+
+  @Override
+  public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
+      throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
+    Preconditions.checkArgument(
+        table instanceof ResolvedCatalogBaseTable, "table should be resolved");
+
+    if (!FlinkGenericTableUtil.isGenericTableWhenCreate(table.getOptions())) {
+      super.createTable(tablePath, table, ignoreIfExists);
+      return;
+    }
+
+    if (!(table instanceof ResolvedCatalogTable)) {
+      throw new CatalogException("Generic table must be a resolved catalog 
table");
+    }
+    ResolvedCatalogTable resolvedTable = (ResolvedCatalogTable) table;
+
+    NameIdentifier identifier =
+        NameIdentifier.of(tablePath.getDatabaseName(), 
tablePath.getObjectName());
+    Map<String, String> properties =
+        FlinkGenericTableUtil.toGravitinoGenericTableProperties(resolvedTable);
+
+    try {
+      catalog()
+          .asTableCatalog()
+          .createTable(
+              identifier,
+              new Column[0],
+              table.getComment(),
+              properties,
+              new Transform[0],
+              Distributions.NONE,
+              new SortOrder[0],
+              new Index[0]);

Review Comment:
   There is no test coverage for generic tables with partitions. While the 
current implementation creates generic tables with empty partitions 
(Transform[0]) in GravitinoHiveCatalog.createTable line 125, it would be 
valuable to add a test case that verifies:
   
   1. Creating a generic table with a PARTITIONED BY clause works correctly (or 
fails with a clear error message)
   2. The behavior is consistent with native Flink Hive catalog
   
   This ensures the partition handling for generic tables is well-tested and 
behaves as expected.



##########
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/FlinkGenericTableUtil.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.flink.connector.hive;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.flink.table.catalog.CatalogPropertiesUtil;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.gravitino.rel.Table;
+
+final class FlinkGenericTableUtil {
+  public static final String CONNECTOR = FactoryUtil.CONNECTOR.key();
+  private static final String CONNECTOR_TYPE = "connector.type";
+  private static final String MANAGED_TABLE_IDENTIFIER = "default";
+
+  private FlinkGenericTableUtil() {}
+
+  static boolean isGenericTableWhenCreate(Map<String, String> options) {
+    if (options == null) {
+      return true;
+    }
+    String connector = options.get(CONNECTOR);
+    if (connector == null) {
+      return true;
+    }
+    return !"hive".equalsIgnoreCase(connector);
+  }
+
+  static boolean isGenericTableWhenLoad(Map<String, String> properties) {
+    // If a table doesn't have properties, it is a raw hive table.
+    if (properties == null) {
+      return false;
+    }
+    if (properties.containsKey(CatalogPropertiesUtil.IS_GENERIC)) {
+      return 
Boolean.parseBoolean(properties.get(CatalogPropertiesUtil.IS_GENERIC));
+    }
+    String connector = getConnectorFromProperties(properties);
+    if (connector == null) {
+      return false;
+    }
+    return !"hive".equalsIgnoreCase(connector);
+  }
+
+  static Map<String, String> 
toGravitinoGenericTableProperties(ResolvedCatalogTable resolvedTable) {
+    Map<String, String> properties = 
CatalogPropertiesUtil.serializeCatalogTable(resolvedTable);
+    if (!properties.containsKey(CONNECTOR)) {
+      properties.put(CONNECTOR, MANAGED_TABLE_IDENTIFIER);
+    }
+    Map<String, String> masked = maskFlinkProperties(properties);
+    masked.put(CatalogPropertiesUtil.IS_GENERIC, "true");
+    return masked;
+  }
+
+  static CatalogTable toFlinkGenericTable(Table table) {
+    Map<String, String> flinkProperties = 
unmaskFlinkProperties(table.properties());
+    CatalogTable catalogTable = 
CatalogPropertiesUtil.deserializeCatalogTable(flinkProperties);
+    if (catalogTable.getUnresolvedSchema().getColumns().isEmpty()) {
+      catalogTable =
+          CatalogPropertiesUtil.deserializeCatalogTable(flinkProperties, 
"generic.table.schema");
+    }

Review Comment:
   The fallback logic at lines 77-79 attempts to deserialize with a different 
schema prefix ("generic.table.schema") if the initial deserialization results 
in empty columns. However, there's no test coverage for this fallback path.
   
   Consider adding a test case that:
   1. Creates properties that trigger this fallback condition
   2. Verifies the fallback deserialization works correctly
   3. Documents when this fallback is needed (e.g., for backward compatibility 
with specific Flink versions)
   
   This ensures the fallback logic is tested and its purpose is clear to future 
maintainers.



##########
flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java:
##########
@@ -80,15 +89,32 @@
 public class FlinkHiveCatalogIT extends FlinkCommonIT {
   private static final String DEFAULT_HIVE_CATALOG = 
"test_flink_hive_schema_catalog";
   private static final String FLINK_USER_NAME = "gravitino";
+  private static final String MYSQL_DATABASE = 
TestDatabaseName.FLINK_HIVE_CATALOG_IT.name();
 
   private static org.apache.gravitino.Catalog hiveCatalog;
   private static String hiveConfDir;
 
+  private String mysqlUrl;
+  private String mysqlUsername;
+  private String mysqlPassword;
+  private String mysqlDriver;
+  private ContainerSuite containerSuite;
+
   @Override
   protected boolean supportsPrimaryKey() {
     return false;
   }
 
+  @Override
+  protected boolean supportColumnOperation() {
+    return false;
+  }
+
+  @Override
+  protected boolean supportTablePropertiesOperation() {
+    return false;

Review Comment:
   These methods return `false` to disable column and table properties 
operations for generic tables in this test suite. However, the PR adds several 
tests that verify column operations on both raw Hive tables 
(testAlterRawHiveTableAddColumn, testAlterRawHiveTableRenameColumn, 
testAlterRawHiveTableDropColumn) and generic tables 
(testAlterGenericTableAddColumn, testAlterGenericTableRenameColumn, 
testAlterGenericTableDropColumn). 
   
   The tests at lines 1211-1259 specifically test column alterations on generic 
tables, which contradicts the `supportColumnOperation() { return false; }` 
setting. This inconsistency suggests that either:
   1. These override methods should be removed since column operations are now 
supported, or
   2. The generic table column tests should be reconsidered.
   
   Please clarify the intent and ensure consistency between these configuration 
methods and the actual test coverage.
   ```suggestion
       return true;
     }
   
     @Override
     protected boolean supportTablePropertiesOperation() {
       return true;
   ```



##########
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalog.java:
##########
@@ -67,4 +92,155 @@ public Optional<Factory> getFactory() {
   protected AbstractCatalog realCatalog() {
     return hiveCatalog;
   }
+
+  @Override
+  public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
+      throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
+    Preconditions.checkArgument(
+        table instanceof ResolvedCatalogBaseTable, "table should be resolved");
+
+    if (!FlinkGenericTableUtil.isGenericTableWhenCreate(table.getOptions())) {
+      super.createTable(tablePath, table, ignoreIfExists);
+      return;
+    }
+
+    if (!(table instanceof ResolvedCatalogTable)) {
+      throw new CatalogException("Generic table must be a resolved catalog 
table");
+    }
+    ResolvedCatalogTable resolvedTable = (ResolvedCatalogTable) table;
+
+    NameIdentifier identifier =
+        NameIdentifier.of(tablePath.getDatabaseName(), 
tablePath.getObjectName());
+    Map<String, String> properties =
+        FlinkGenericTableUtil.toGravitinoGenericTableProperties(resolvedTable);
+
+    try {
+      catalog()
+          .asTableCatalog()
+          .createTable(
+              identifier,
+              new Column[0],
+              table.getComment(),
+              properties,
+              new Transform[0],
+              Distributions.NONE,
+              new SortOrder[0],
+              new Index[0]);
+    } catch (NoSuchSchemaException e) {
+      throw new DatabaseNotExistException(catalogName(), 
tablePath.getDatabaseName(), e);
+    } catch (TableAlreadyExistsException e) {
+      if (!ignoreIfExists) {
+        throw new TableAlreadyExistException(catalogName(), tablePath, e);
+      }
+    } catch (Exception e) {
+      throw new CatalogException(e);
+    }
+  }
+
+  @Override
+  public CatalogBaseTable getTable(ObjectPath tablePath)
+      throws TableNotExistException, CatalogException {
+    try {
+      Table table =
+          catalog()
+              .asTableCatalog()
+              .loadTable(NameIdentifier.of(tablePath.getDatabaseName(), 
tablePath.getObjectName()));
+      if (FlinkGenericTableUtil.isGenericTableWhenLoad(table.properties())) {
+        return FlinkGenericTableUtil.toFlinkGenericTable(table);
+      }
+      return super.toFlinkTable(table, tablePath);
+    } catch (NoSuchTableException e) {
+      throw new TableNotExistException(catalogName(), tablePath, e);
+    } catch (Exception e) {
+      throw new CatalogException(e);
+    }
+  }
+
+  @Override
+  public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
+      throws TableNotExistException, CatalogException {
+    Table table = loadGravitinoTable(tablePath, ignoreIfNotExists);
+    if (table == null) {
+      return;
+    }
+    if (!FlinkGenericTableUtil.isGenericTableWhenLoad(table.properties())) {
+      super.alterTable(tablePath, newTable, ignoreIfNotExists);
+      return;
+    }
+    if (!(newTable instanceof ResolvedCatalogTable)) {
+      throw new CatalogException("Generic table must be a resolved catalog 
table");
+    }
+    applyGenericTableAlter(tablePath, table, (ResolvedCatalogTable) newTable);
+  }
+
+  @Override
+  public void alterTable(
+      ObjectPath tablePath,
+      CatalogBaseTable newTable,
+      java.util.List<org.apache.flink.table.catalog.TableChange> tableChanges,
+      boolean ignoreIfNotExists)
+      throws TableNotExistException, CatalogException {
+    Table table = loadGravitinoTable(tablePath, ignoreIfNotExists);
+    if (table == null) {
+      return;
+    }
+    if (!FlinkGenericTableUtil.isGenericTableWhenLoad(table.properties())) {
+      super.alterTable(tablePath, newTable, tableChanges, ignoreIfNotExists);
+      return;
+    }
+    if (!(newTable instanceof ResolvedCatalogTable)) {
+      throw new CatalogException("Generic table must be a resolved catalog 
table");
+    }

Review Comment:
   The `applyGenericTableAlter` method ignores the `tableChanges` parameter in 
the second `alterTable` overload (line 180). While this appears intentional 
since generic tables re-serialize the entire schema into properties (line 219), 
it would be valuable to add a comment explaining this design decision.
   
   For example:
   ```java
   // For generic tables, we re-serialize the entire table schema and partition 
keys
   // into flink.* properties, so the individual tableChanges are not needed.
   // The newTable parameter contains the final state after applying all 
changes.
   ```
   
   This helps future maintainers understand why the parameter is unused and 
prevents potential confusion or incorrect modifications.
   ```suggestion
       }
       // For generic tables, we re-serialize the entire table schema and 
partition keys
       // into flink.* properties, so the individual tableChanges are not 
needed.
       // The newTable parameter contains the final state after applying all 
changes.
   ```



##########
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalog.java:
##########
@@ -67,4 +92,155 @@ public Optional<Factory> getFactory() {
   protected AbstractCatalog realCatalog() {
     return hiveCatalog;
   }
+
+  @Override
+  public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
+      throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
+    Preconditions.checkArgument(
+        table instanceof ResolvedCatalogBaseTable, "table should be resolved");
+
+    if (!FlinkGenericTableUtil.isGenericTableWhenCreate(table.getOptions())) {
+      super.createTable(tablePath, table, ignoreIfExists);
+      return;
+    }
+
+    if (!(table instanceof ResolvedCatalogTable)) {
+      throw new CatalogException("Generic table must be a resolved catalog 
table");
+    }
+    ResolvedCatalogTable resolvedTable = (ResolvedCatalogTable) table;
+
+    NameIdentifier identifier =
+        NameIdentifier.of(tablePath.getDatabaseName(), 
tablePath.getObjectName());
+    Map<String, String> properties =
+        FlinkGenericTableUtil.toGravitinoGenericTableProperties(resolvedTable);
+
+    try {
+      catalog()
+          .asTableCatalog()
+          .createTable(
+              identifier,
+              new Column[0],
+              table.getComment(),
+              properties,
+              new Transform[0],
+              Distributions.NONE,
+              new SortOrder[0],
+              new Index[0]);
+    } catch (NoSuchSchemaException e) {
+      throw new DatabaseNotExistException(catalogName(), 
tablePath.getDatabaseName(), e);
+    } catch (TableAlreadyExistsException e) {
+      if (!ignoreIfExists) {
+        throw new TableAlreadyExistException(catalogName(), tablePath, e);
+      }
+    } catch (Exception e) {
+      throw new CatalogException(e);
+    }
+  }
+
+  @Override
+  public CatalogBaseTable getTable(ObjectPath tablePath)
+      throws TableNotExistException, CatalogException {
+    try {
+      Table table =
+          catalog()
+              .asTableCatalog()
+              .loadTable(NameIdentifier.of(tablePath.getDatabaseName(), 
tablePath.getObjectName()));
+      if (FlinkGenericTableUtil.isGenericTableWhenLoad(table.properties())) {
+        return FlinkGenericTableUtil.toFlinkGenericTable(table);
+      }
+      return super.toFlinkTable(table, tablePath);
+    } catch (NoSuchTableException e) {
+      throw new TableNotExistException(catalogName(), tablePath, e);
+    } catch (Exception e) {
+      throw new CatalogException(e);
+    }
+  }
+
+  @Override
+  public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
+      throws TableNotExistException, CatalogException {
+    Table table = loadGravitinoTable(tablePath, ignoreIfNotExists);
+    if (table == null) {
+      return;
+    }
+    if (!FlinkGenericTableUtil.isGenericTableWhenLoad(table.properties())) {
+      super.alterTable(tablePath, newTable, ignoreIfNotExists);
+      return;
+    }
+    if (!(newTable instanceof ResolvedCatalogTable)) {
+      throw new CatalogException("Generic table must be a resolved catalog 
table");
+    }
+    applyGenericTableAlter(tablePath, table, (ResolvedCatalogTable) newTable);
+  }
+
+  @Override
+  public void alterTable(
+      ObjectPath tablePath,
+      CatalogBaseTable newTable,
+      java.util.List<org.apache.flink.table.catalog.TableChange> tableChanges,
+      boolean ignoreIfNotExists)
+      throws TableNotExistException, CatalogException {
+    Table table = loadGravitinoTable(tablePath, ignoreIfNotExists);
+    if (table == null) {
+      return;
+    }
+    if (!FlinkGenericTableUtil.isGenericTableWhenLoad(table.properties())) {
+      super.alterTable(tablePath, newTable, tableChanges, ignoreIfNotExists);
+      return;
+    }
+    if (!(newTable instanceof ResolvedCatalogTable)) {
+      throw new CatalogException("Generic table must be a resolved catalog 
table");
+    }
+    applyGenericTableAlter(tablePath, table, (ResolvedCatalogTable) newTable);
+  }
+
+  private Table loadGravitinoTable(ObjectPath tablePath, boolean 
ignoreIfNotExists)
+      throws TableNotExistException, CatalogException {
+    try {
+      return catalog()
+          .asTableCatalog()
+          .loadTable(NameIdentifier.of(tablePath.getDatabaseName(), 
tablePath.getObjectName()));
+    } catch (NoSuchTableException e) {
+      if (!ignoreIfNotExists) {
+        throw new TableNotExistException(catalogName(), tablePath, e);
+      }
+      return null;
+    } catch (Exception e) {
+      throw new CatalogException(e);
+    }
+  }
+
+  private void applyGenericTableAlter(
+      ObjectPath tablePath, Table existingTable, ResolvedCatalogTable newTable)
+      throws CatalogException {
+    NameIdentifier identifier =
+        NameIdentifier.of(tablePath.getDatabaseName(), 
tablePath.getObjectName());
+    Map<String, String> updatedProperties =
+        FlinkGenericTableUtil.toGravitinoGenericTableProperties(newTable);
+    Map<String, String> currentProperties =
+        existingTable.properties() == null ? Collections.emptyMap() : 
existingTable.properties();
+
+    List<TableChange> changes = new ArrayList<>();
+    if (!Objects.equals(existingTable.comment(), newTable.getComment())) {
+      changes.add(TableChange.updateComment(newTable.getComment()));
+    }
+
+    currentProperties.keySet().stream()
+        .filter(
+            key ->
+                (key.startsWith(CatalogPropertiesUtil.FLINK_PROPERTY_PREFIX)
+                        || CatalogPropertiesUtil.IS_GENERIC.equals(key))
+                    && !updatedProperties.containsKey(key))
+        .forEach(key -> changes.add(TableChange.removeProperty(key)));
+
+    updatedProperties.forEach(
+        (key, value) -> {
+          String currentValue = currentProperties.get(key);
+          if (!value.equals(currentValue)) {
+            changes.add(TableChange.setProperty(key, value));
+          }
+        });
+
+    catalog().asTableCatalog().alterTable(identifier, changes.toArray(new 
TableChange[0]));

Review Comment:
   The `applyGenericTableAlter` method calls 
`catalog().asTableCatalog().alterTable()` at line 244 without wrapping it in a 
try-catch block. This is inconsistent with other methods in this class:
   
   - `createTable` (lines 117-137) wraps the call in try-catch and converts 
exceptions to appropriate Flink exceptions
   - `getTable` (lines 143-156) wraps the call in try-catch and converts 
exceptions
   - `loadGravitinoTable` (lines 199-210) wraps the call in try-catch
   
   The `alterTable()` call could throw various exceptions (like 
`NoSuchTableException`, `IllegalArgumentException`, etc.) that should be 
converted to `CatalogException` or `TableNotExistException` for consistency 
with Flink's catalog API. Consider adding proper exception handling similar to 
other methods in this class.
   ```suggestion
       try {
         catalog().asTableCatalog().alterTable(identifier, changes.toArray(new 
TableChange[0]));
       } catch (NoSuchTableException e) {
         throw new CatalogException(e);
       } catch (RuntimeException e) {
         throw new CatalogException(e);
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to