FANNG1 commented on code in PR #10517:
URL: https://github.com/apache/gravitino/pull/10517#discussion_r2988177122


##########
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalog.java:
##########
@@ -252,4 +251,13 @@ private void applyGenericTableAlter(
       throw new CatalogException(e);
     }
   }
+
+  protected Map<String, String> toGravitinoGenericTableProperties(
+      ResolvedCatalogTable resolvedTable) {
+    return 
FlinkGenericTableUtil.toGravitinoGenericTableProperties(resolvedTable, 
catalogCompat());
+  }
+
+  protected CatalogTable toFlinkGenericTable(Table table) {
+    return FlinkGenericTableUtil.toFlinkGenericTable(table, catalogCompat());
+  }

Review Comment:
   I prefer to keep these thin wrapper methods on the catalog class. They 
delegate to `FlinkGenericTableUtil`, but they also bind the call to the catalog 
instance so the version-specific `catalogCompat()` hook can be applied 
naturally by subclasses such as the Flink 1.18 catalog. Moving them back to the 
util class would make the compat path more scattered rather than simpler.



##########
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalogFactoryOptions.java:
##########
@@ -30,6 +30,20 @@ public class GravitinoJdbcCatalogFactoryOptions {
   /** Identifier for the {@link GravitinoJdbcCatalog}. */
   public static final String POSTGRESQL_IDENTIFIER = 
"gravitino-jdbc-postgresql";
 
+  public static final ConfigOption<String> BASE_URL =
+      
ConfigOptions.key(JdbcPropertiesConstants.FLINK_JDBC_URL).stringType().noDefaultValue();
+
+  public static final ConfigOption<String> USERNAME =
+      
ConfigOptions.key(JdbcPropertiesConstants.FLINK_JDBC_USER).stringType().noDefaultValue();
+
+  public static final ConfigOption<String> PASSWORD =
+      
ConfigOptions.key(JdbcPropertiesConstants.FLINK_JDBC_PASSWORD).stringType().noDefaultValue();
+
   public static final ConfigOption<String> DEFAULT_DATABASE =
-      ConfigOptions.key("default-database").stringType().noDefaultValue();
+      ConfigOptions.key(JdbcPropertiesConstants.FLINK_JDBC_DEFAULT_DATABASE)
+          .stringType()
+          .noDefaultValue();
+
+  public static final ConfigOption<String> DRIVER =
+      
ConfigOptions.key(JdbcPropertiesConstants.FLINK_DRIVER).stringType().noDefaultValue();
 }

Review Comment:
   I kept these options here intentionally. This does not introduce a new 
user-facing requirement compared with the old implementation; it centralizes 
the Flink-side JDBC option definitions so `requiredOptions()`, 
`optionalOptions()`, and option lookups use the same source of truth after the 
versioned refactor. That keeps the factory logic clearer and avoids scattering 
the option keys across the implementation.



##########
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java:
##########
@@ -570,7 +572,21 @@ protected CatalogBaseTable toFlinkTable(Table table, 
ObjectPath tablePath) {
         schemaAndTablePropertiesConverter.toFlinkTableProperties(
             catalogOptions, table.properties(), tablePath);
     List<String> partitionKeys = 
partitionConverter.toFlinkPartitionKeys(table.partitioning());
-    return CatalogTable.of(builder.build(), table.comment(), partitionKeys, 
flinkTableProperties);
+    return newCatalogTable(builder.build(), table.comment(), partitionKeys, 
flinkTableProperties);
+  }
+
+  protected CatalogTable newCatalogTable(
+      org.apache.flink.table.api.Schema schema,
+      String comment,
+      List<String> partitionKeys,
+      Map<String, String> options) {
+    return catalogCompat().createCatalogTable(schema, comment, partitionKeys, 
options);
+  }
+
+  protected CatalogCompat catalogCompat() {
+    // Versioned catalog entry classes override this hook when the Flink minor 
has a different
+    // catalog/table API path.
+    return DefaultCatalogCompat.INSTANCE;

Review Comment:
   I do not think this needs a code change in this PR. The current structure 
uses a shared common layer plus version-specific adapters/hooks 
(`catalogCompat()` and the versioned catalog/factory classes) to isolate Flink 
minor-version API differences. That is the main reason for this split: keep the 
common behavior in one place and localize the Flink-version-specific parts in 
each version module.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to