twalthr commented on a change in pull request #15245:
URL: https://github.com/apache/flink/pull/15245#discussion_r595820374



##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/CatalogFactory.java
##########
@@ -19,22 +19,103 @@
 package org.apache.flink.table.factories;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
 
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * A factory to create configured catalog instances based on string-based 
properties. See also
- * {@link TableFactory} for more information.
+ * {@link Factory} for more information.
+ *
+ * <p>Note that this interface supports the {@link TableFactory} stack for 
compatibility purposes.
+ * This is deprecated, however, and new implementations should implement the 
{@link Factory} stack
+ * instead.
  */
 @PublicEvolving
-public interface CatalogFactory extends TableFactory {
+public interface CatalogFactory extends TableFactory, Factory {
 
     /**
      * Creates and configures a {@link Catalog} using the given properties.
      *
      * @param properties normalized properties describing an external catalog.
      * @return the configured catalog.
+     * @deprecated Use {@link this#createCatalog(Context)} instead and 
implement {@link Factory}
+     *     instead of {@link TableFactory}.
+     */
+    @Deprecated
+    default Catalog createCatalog(String name, Map<String, String> properties) 
{
+        throw new CatalogException("Catalog factories must implement 
createCatalog()");
+    }
+
+    /**
+     * Creates and configures a {@link Catalog} using the given context.
+     *
+     * <p>An implementation should perform validation and the discovery of 
further (nested)
+     * factories in this method.
      */
-    Catalog createCatalog(String name, Map<String, String> properties);
+    default Catalog createCatalog(Context context) {
+        throw new CatalogException("Catalog factories must implement 
createCatalog(Context)");
+    }
+
+    /** Context provided when a catalog is created. */
+    interface Context {
+        /** Returns the name with which the catalog is created. */
+        String getName();
+
+        /**
+         * Returns the properties with which the catalog is created.
+         *
+         * <p>An implementation should perform validation of these properties.
+         */
+        Map<String, String> getProperties();

Review comment:
       we should call this options for consistency. we also call it options at 
other locations.

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -200,6 +209,48 @@ public static TableFactoryHelper createTableFactoryHelper(
         return new TableFactoryHelper(factory, context);
     }
 
+    /**
+     * Attempts to discover an appropriate catalog factory and creates an 
instance of the catalog.
+     *
+     * <p>This first uses the legacy {@link TableFactory} stack to discover a 
matching {@link
+     * CatalogFactory}. If none is found, it falls back to the new stack using 
{@link Factory}
+     * instead.
+     */
+    public static Catalog createCatalog(
+            String catalogName,
+            Map<String, String> properties,
+            ReadableConfig configuration,
+            ClassLoader classLoader) {
+        // Use the legacy mechanism first for compatibility
+        try {
+            final CatalogFactory legacyFactory =
+                    TableFactoryService.find(CatalogFactory.class, properties, 
classLoader);
+            return legacyFactory.createCatalog(catalogName, properties);
+        } catch (NoMatchingTableFactoryException e) {
+            // No matching legacy factory found, try using the new stack
+
+            final DefaultCatalogContext context =
+                    new DefaultCatalogContext(catalogName, properties, 
configuration, classLoader);

Review comment:
       the `type` should not be part of the properties anymore, it should only 
be used for factory discovery. otherwise every factory needs to implement logic 
for skipping this property during validation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to