This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6d231b0366178a2219eb4bad5f386e2d73ff15a5
Author: JingsongLi <[email protected]>
AuthorDate: Fri Dec 17 15:54:34 2021 +0800

    [FLINK-25174][table] Introduce ManagedTableFactory
---
 .../src/test/resources/sql/catalog_database.q      |  14 +--
 .../src/test/resources/sql/table.q                 |   2 +-
 .../apache/flink/table/factories/FactoryUtil.java  |  42 ++++++-
 .../flink/table/factories/ManagedTableFactory.java |  57 +++++++++
 .../flink/table/factories/FactoryUtilTest.java     |  10 +-
 .../table/factories/TestManagedTableFactory.java   | 130 +++++++++++++++++++++
 .../org.apache.flink.table.factories.Factory       |   1 +
 .../flink/table/api/TableEnvironmentTest.scala     |   8 +-
 8 files changed, 245 insertions(+), 19 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q 
b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
index 9aa3fd0..71050f5 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
@@ -287,11 +287,11 @@ use catalog hivecatalog;
 [INFO] Execute statement succeed.
 !info
 
-create table MyTable1 (a int, b string);
+create table MyTable1 (a int, b string) with ('connector' = 'values');
 [INFO] Execute statement succeed.
 !info
 
-create table MyTable2 (a int, b string);
+create table MyTable2 (a int, b string) with ('connector' = 'values');
 [INFO] Execute statement succeed.
 !info
 
@@ -330,11 +330,11 @@ show views;
 !ok
 
 # test create with full qualified name
-create table c1.db1.MyTable3 (a int, b string);
+create table c1.db1.MyTable3 (a int, b string) with ('connector' = 'values');
 [INFO] Execute statement succeed.
 !info
 
-create table c1.db1.MyTable4 (a int, b string);
+create table c1.db1.MyTable4 (a int, b string) with ('connector' = 'values');
 [INFO] Execute statement succeed.
 !info
 
@@ -377,11 +377,11 @@ show views;
 !ok
 
 # test create with database name
-create table `default`.MyTable5 (a int, b string);
+create table `default`.MyTable5 (a int, b string) with ('connector' = 
'values');
 [INFO] Execute statement succeed.
 !info
 
-create table `default`.MyTable6 (a int, b string);
+create table `default`.MyTable6 (a int, b string) with ('connector' = 
'values');
 [INFO] Execute statement succeed.
 !info
 
@@ -489,7 +489,7 @@ SET 'sql-client.execution.result-mode' = 'changelog';
 [INFO] Session property has been set.
 !info
 
-create table MyTable7 (a int, b string);
+create table MyTable7 (a int, b string) with ('connector' = 'values');
 [INFO] Execute statement succeed.
 !info
 
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/table.q 
b/flink-table/flink-sql-client/src/test/resources/sql/table.q
index a016087..25aba02 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/table.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/table.q
@@ -494,7 +494,7 @@ drop temporary table tbl1;
 # test playing with keyword identifiers
 # ==========================================================================
 
-create table `mod` (`table` string, `database` string);
+create table `mod` (`table` string, `database` string) with ('connector' = 
'values');
 [INFO] Execute statement succeed.
 !info
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
index cd828de..0b751a6 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
@@ -64,6 +64,7 @@ import java.util.stream.StreamSupport;
 import static org.apache.flink.configuration.ConfigurationUtils.canBePrefixMap;
 import static 
org.apache.flink.configuration.ConfigurationUtils.filterPrefixMapKey;
 import static 
org.apache.flink.configuration.GlobalConfiguration.HIDDEN_CONTENT;
+import static 
org.apache.flink.table.factories.ManagedTableFactory.DEFAULT_IDENTIFIER;
 import static org.apache.flink.table.module.CommonModuleOptions.MODULE_TYPE;
 
 /** Utility for working with {@link Factory}s. */
@@ -469,6 +470,7 @@ public final class FactoryUtil {
                             factoryClass.getName(),
                             foundFactories.stream()
                                     .map(Factory::factoryIdentifier)
+                                    .filter(identifier -> 
!DEFAULT_IDENTIFIER.equals(identifier))
                                     .distinct()
                                     .sorted()
                                     .collect(Collectors.joining("\n"))));
@@ -620,10 +622,7 @@ public final class FactoryUtil {
             Class<T> factoryClass, DynamicTableFactory.Context context) {
         final String connectorOption = 
context.getCatalogTable().getOptions().get(CONNECTOR.key());
         if (connectorOption == null) {
-            throw new ValidationException(
-                    String.format(
-                            "Table options do not contain an option key '%s' 
for discovering a connector.",
-                            CONNECTOR.key()));
+            return discoverManagedTableFactory(context.getClassLoader(), 
factoryClass);
         }
         try {
             return discoverFactory(context.getClassLoader(), factoryClass, 
connectorOption);
@@ -687,6 +686,41 @@ public final class FactoryUtil {
         }
     }
 
+    @SuppressWarnings("unchecked")
+    static <T extends DynamicTableFactory> T discoverManagedTableFactory(
+            ClassLoader classLoader, Class<T> implementClass) {
+        final List<Factory> factories = discoverFactories(classLoader);
+
+        final List<Factory> foundFactories =
+                factories.stream()
+                        .filter(f -> 
ManagedTableFactory.class.isAssignableFrom(f.getClass()))
+                        .filter(f -> 
implementClass.isAssignableFrom(f.getClass()))
+                        .collect(Collectors.toList());
+
+        if (foundFactories.isEmpty()) {
+            throw new ValidationException(
+                    String.format(
+                            "Table options do not contain an option key 
'connector' for discovering a connector. "
+                                    + "Therefore, Flink assumes a managed 
table. However, a managed table factory "
+                                    + "that implements %s is not in the 
classpath.",
+                            implementClass.getName()));
+        }
+
+        if (foundFactories.size() > 1) {
+            throw new ValidationException(
+                    String.format(
+                            "Multiple factories for managed table found in the 
classpath.\n\n"
+                                    + "Ambiguous factory classes are:\n\n"
+                                    + "%s",
+                            foundFactories.stream()
+                                    .map(f -> f.getClass().getName())
+                                    .sorted()
+                                    .collect(Collectors.joining("\n"))));
+        }
+
+        return (T) foundFactories.get(0);
+    }
+
     static List<Factory> discoverFactories(ClassLoader classLoader) {
         final List<Factory> result = new LinkedList<>();
         ServiceLoaderUtil.load(Factory.class, classLoader)
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ManagedTableFactory.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ManagedTableFactory.java
new file mode 100644
index 0000000..6dcba28
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ManagedTableFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.Map;
+
+/**
+ * Base interface for configuring a managed dynamic table connector. The 
managed table factory is
+ * used when there is no {@link FactoryUtil#CONNECTOR} option.
+ */
+@Internal
+public interface ManagedTableFactory extends DynamicTableFactory {
+
+    /** {@link #factoryIdentifier()} for the managed table factory. */
+    String DEFAULT_IDENTIFIER = "default";
+
+    @Override
+    default String factoryIdentifier() {
+        return DEFAULT_IDENTIFIER;
+    }
+
+    /**
+     * Enrich options from catalog and session information.
+     *
+     * @return new options of this table.
+     */
+    Map<String, String> enrichOptions(Context context);
+
+    /** Notifies the listener that a table creation occurred. */
+    void onCreateTable(Context context, boolean ignoreIfExists);
+
+    /** Notifies the listener that a table drop occurred. */
+    void onDropTable(Context context, boolean ignoreIfNotExists);
+
+    /** Discovers the unique implementation of {@link ManagedTableFactory} 
without identifier. */
+    static ManagedTableFactory discoverManagedTableFactory(ClassLoader 
classLoader) {
+        return FactoryUtil.discoverManagedTableFactory(classLoader, 
ManagedTableFactory.class);
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
index c33d24f..2cc4b5f 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
@@ -59,10 +59,12 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 public class FactoryUtilTest {
 
     @Test
-    public void testMissingConnector() {
-        assertCreateTableSourceWithOptionModifier(
-                options -> options.remove("connector"),
-                "Table options do not contain an option key 'connector' for 
discovering a connector.");
+    public void testManagedConnector() {
+        final Map<String, String> options = createAllOptions();
+        options.remove("connector");
+        final DynamicTableSource actualSource = createTableSource(SCHEMA, 
options);
+        assertThat(actualSource)
+                
.isExactlyInstanceOf(TestManagedTableFactory.TestManagedTableSource.class);
     }
 
     @Test
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestManagedTableFactory.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestManagedTableFactory.java
new file mode 100644
index 0000000..c45a57e
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestManagedTableFactory.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+/** A test {@link ManagedTableFactory}. */
+public class TestManagedTableFactory implements DynamicTableSourceFactory, 
ManagedTableFactory {
+
+    public static final String ENRICHED_KEY = "ENRICHED_KEY";
+
+    public static final String ENRICHED_VALUE = "ENRICHED_VALUE";
+
+    public static final Map<ObjectIdentifier, AtomicReference<Map<String, 
String>>> MANAGED_TABLES =
+            new ConcurrentHashMap<>();
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        return new HashSet<>();
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        return new HashSet<>();
+    }
+
+    @Override
+    public Map<String, String> enrichOptions(Context context) {
+        Map<String, String> newOptions = new 
HashMap<>(context.getCatalogTable().getOptions());
+        if (MANAGED_TABLES.containsKey(context.getObjectIdentifier())) {
+            newOptions.put(ENRICHED_KEY, ENRICHED_VALUE);
+        }
+        return newOptions;
+    }
+
+    @Override
+    public void onCreateTable(Context context, boolean ignoreIfExists) {
+        MANAGED_TABLES.compute(
+                context.getObjectIdentifier(),
+                (k, v) -> {
+                    if (v != null) {
+                        if (v.get() == null) {
+                            v.set(context.getCatalogTable().getOptions());
+                        } else if (!ignoreIfExists) {
+                            throw new TableException("Table exists.");
+                        }
+                    }
+                    return v;
+                });
+    }
+
+    @Override
+    public void onDropTable(Context context, boolean ignoreIfNotExists) {
+        AtomicReference<Map<String, String>> reference =
+                MANAGED_TABLES.get(context.getObjectIdentifier());
+        if (reference != null) {
+            Map<String, String> previous = reference.getAndSet(null);
+            if (!context.getCatalogTable().getOptions().equals(previous) && 
!ignoreIfNotExists) {
+                throw new TableException("Table does not exist.");
+            }
+        }
+    }
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        return new TestManagedTableSource();
+    }
+
+    /** Managed {@link DynamicTableSource} for testing. */
+    public static class TestManagedTableSource implements ScanTableSource {
+
+        @Override
+        public ChangelogMode getChangelogMode() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ScanRuntimeProvider getScanRuntimeProvider(ScanContext 
runtimeProviderContext) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public DynamicTableSource copy() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public String asSummaryString() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int hashCode() {
+            throw new UnsupportedOperationException();
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 8f50817..c969661 100644
--- 
a/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ 
b/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -22,4 +22,5 @@ 
org.apache.flink.table.factories.TestDynamicTableSourceOnlyFactory
 org.apache.flink.table.factories.TestConflictingDynamicTableFactory1
 org.apache.flink.table.factories.TestConflictingDynamicTableFactory2
 org.apache.flink.table.factories.TestCatalogFactory
+org.apache.flink.table.factories.TestManagedTableFactory
 org.apache.flink.table.factories.module.DummyModuleFactory
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index 2833f1d..0ec319d 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -258,16 +258,18 @@ class TableEnvironmentTest {
         |  b int,
         |  c varchar
         |) WITH (
-        |  'connector' = 'COLLECTION'
+        |  'connector' = 'filesystem',
+        |  'format' = 'testcsv',
+        |  'path' = '_invalid'
         |)
       """.stripMargin
     tableEnv.executeSql(statement)
 
-    val alterTableResetStatement = "ALTER TABLE MyTable RESET ('connector')"
+    val alterTableResetStatement = "ALTER TABLE MyTable RESET ('format')"
     val tableResult = tableEnv.executeSql(alterTableResetStatement)
     assertEquals(ResultKind.SUCCESS, tableResult.getResultKind)
     assertEquals(
-      Map.empty.asJava,
+      Map("connector" -> "filesystem", "path" -> "_invalid").asJava,
       tableEnv.getCatalog(tableEnv.getCurrentCatalog).get()
         
.getTable(ObjectPath.fromString(s"${tableEnv.getCurrentDatabase}.MyTable")).getOptions)
     expectedException.expect(classOf[ValidationException])

Reply via email to