This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6d231b0366178a2219eb4bad5f386e2d73ff15a5 Author: JingsongLi <[email protected]> AuthorDate: Fri Dec 17 15:54:34 2021 +0800 [FLINK-25174][table] Introduce ManagedTableFactory --- .../src/test/resources/sql/catalog_database.q | 14 +-- .../src/test/resources/sql/table.q | 2 +- .../apache/flink/table/factories/FactoryUtil.java | 42 ++++++- .../flink/table/factories/ManagedTableFactory.java | 57 +++++++++ .../flink/table/factories/FactoryUtilTest.java | 10 +- .../table/factories/TestManagedTableFactory.java | 130 +++++++++++++++++++++ .../org.apache.flink.table.factories.Factory | 1 + .../flink/table/api/TableEnvironmentTest.scala | 8 +- 8 files changed, 245 insertions(+), 19 deletions(-) diff --git a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q index 9aa3fd0..71050f5 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q +++ b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q @@ -287,11 +287,11 @@ use catalog hivecatalog; [INFO] Execute statement succeed. !info -create table MyTable1 (a int, b string); +create table MyTable1 (a int, b string) with ('connector' = 'values'); [INFO] Execute statement succeed. !info -create table MyTable2 (a int, b string); +create table MyTable2 (a int, b string) with ('connector' = 'values'); [INFO] Execute statement succeed. !info @@ -330,11 +330,11 @@ show views; !ok # test create with full qualified name -create table c1.db1.MyTable3 (a int, b string); +create table c1.db1.MyTable3 (a int, b string) with ('connector' = 'values'); [INFO] Execute statement succeed. !info -create table c1.db1.MyTable4 (a int, b string); +create table c1.db1.MyTable4 (a int, b string) with ('connector' = 'values'); [INFO] Execute statement succeed. !info @@ -377,11 +377,11 @@ show views; !ok # test create with database name -create table `default`.MyTable5 (a int, b string); +create table `default`.MyTable5 (a int, b string) with ('connector' = 'values'); [INFO] Execute statement succeed. !info -create table `default`.MyTable6 (a int, b string); +create table `default`.MyTable6 (a int, b string) with ('connector' = 'values'); [INFO] Execute statement succeed. !info @@ -489,7 +489,7 @@ SET 'sql-client.execution.result-mode' = 'changelog'; [INFO] Session property has been set. !info -create table MyTable7 (a int, b string); +create table MyTable7 (a int, b string) with ('connector' = 'values'); [INFO] Execute statement succeed. !info diff --git a/flink-table/flink-sql-client/src/test/resources/sql/table.q b/flink-table/flink-sql-client/src/test/resources/sql/table.q index a016087..25aba02 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql/table.q +++ b/flink-table/flink-sql-client/src/test/resources/sql/table.q @@ -494,7 +494,7 @@ drop temporary table tbl1; # test playing with keyword identifiers # ========================================================================== -create table `mod` (`table` string, `database` string); +create table `mod` (`table` string, `database` string) with ('connector' = 'values'); [INFO] Execute statement succeed. !info diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java index cd828de..0b751a6 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java @@ -64,6 +64,7 @@ import java.util.stream.StreamSupport; import static org.apache.flink.configuration.ConfigurationUtils.canBePrefixMap; import static org.apache.flink.configuration.ConfigurationUtils.filterPrefixMapKey; import static org.apache.flink.configuration.GlobalConfiguration.HIDDEN_CONTENT; +import static org.apache.flink.table.factories.ManagedTableFactory.DEFAULT_IDENTIFIER; import static org.apache.flink.table.module.CommonModuleOptions.MODULE_TYPE; /** Utility for working with {@link Factory}s. */ @@ -469,6 +470,7 @@ public final class FactoryUtil { factoryClass.getName(), foundFactories.stream() .map(Factory::factoryIdentifier) + .filter(identifier -> !DEFAULT_IDENTIFIER.equals(identifier)) .distinct() .sorted() .collect(Collectors.joining("\n")))); @@ -620,10 +622,7 @@ public final class FactoryUtil { Class<T> factoryClass, DynamicTableFactory.Context context) { final String connectorOption = context.getCatalogTable().getOptions().get(CONNECTOR.key()); if (connectorOption == null) { - throw new ValidationException( - String.format( - "Table options do not contain an option key '%s' for discovering a connector.", - CONNECTOR.key())); + return discoverManagedTableFactory(context.getClassLoader(), factoryClass); } try { return discoverFactory(context.getClassLoader(), factoryClass, connectorOption); @@ -687,6 +686,41 @@ public final class FactoryUtil { } } + @SuppressWarnings("unchecked") + static <T extends DynamicTableFactory> T discoverManagedTableFactory( + ClassLoader classLoader, Class<T> implementClass) { + final List<Factory> factories = discoverFactories(classLoader); + + final List<Factory> foundFactories = + factories.stream() + .filter(f -> ManagedTableFactory.class.isAssignableFrom(f.getClass())) + .filter(f -> implementClass.isAssignableFrom(f.getClass())) + .collect(Collectors.toList()); + + if (foundFactories.isEmpty()) { + throw new ValidationException( + String.format( + "Table options do not contain an option key 'connector' for discovering a connector. " + + "Therefore, Flink assumes a managed table. However, a managed table factory " + + "that implements %s is not in the classpath.", + implementClass.getName())); + } + + if (foundFactories.size() > 1) { + throw new ValidationException( + String.format( + "Multiple factories for managed table found in the classpath.\n\n" + + "Ambiguous factory classes are:\n\n" + + "%s", + foundFactories.stream() + .map(f -> f.getClass().getName()) + .sorted() + .collect(Collectors.joining("\n")))); + } + + return (T) foundFactories.get(0); + } + static List<Factory> discoverFactories(ClassLoader classLoader) { final List<Factory> result = new LinkedList<>(); ServiceLoaderUtil.load(Factory.class, classLoader) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ManagedTableFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ManagedTableFactory.java new file mode 100644 index 0000000..6dcba28 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ManagedTableFactory.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.annotation.Internal; + +import java.util.Map; + +/** + * Base interface for configuring a managed dynamic table connector. The managed table factory is + * used when there is no {@link FactoryUtil#CONNECTOR} option. + */ +@Internal +public interface ManagedTableFactory extends DynamicTableFactory { + + /** {@link #factoryIdentifier()} for the managed table factory. */ + String DEFAULT_IDENTIFIER = "default"; + + @Override + default String factoryIdentifier() { + return DEFAULT_IDENTIFIER; + } + + /** + * Enrich options from catalog and session information. + * + * @return new options of this table. + */ + Map<String, String> enrichOptions(Context context); + + /** Notifies the listener that a table creation occurred. */ + void onCreateTable(Context context, boolean ignoreIfExists); + + /** Notifies the listener that a table drop occurred. */ + void onDropTable(Context context, boolean ignoreIfNotExists); + + /** Discovers the unique implementation of {@link ManagedTableFactory} without identifier. */ + static ManagedTableFactory discoverManagedTableFactory(ClassLoader classLoader) { + return FactoryUtil.discoverManagedTableFactory(classLoader, ManagedTableFactory.class); + } +} diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java index c33d24f..2cc4b5f 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java @@ -59,10 +59,12 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; public class FactoryUtilTest { @Test - public void testMissingConnector() { - assertCreateTableSourceWithOptionModifier( - options -> options.remove("connector"), - "Table options do not contain an option key 'connector' for discovering a connector."); + public void testManagedConnector() { + final Map<String, String> options = createAllOptions(); + options.remove("connector"); + final DynamicTableSource actualSource = createTableSource(SCHEMA, options); + assertThat(actualSource) + .isExactlyInstanceOf(TestManagedTableFactory.TestManagedTableSource.class); } @Test diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestManagedTableFactory.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestManagedTableFactory.java new file mode 100644 index 0000000..c45a57e --- /dev/null +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestManagedTableFactory.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; + +/** A test {@link ManagedTableFactory}. */ +public class TestManagedTableFactory implements DynamicTableSourceFactory, ManagedTableFactory { + + public static final String ENRICHED_KEY = "ENRICHED_KEY"; + + public static final String ENRICHED_VALUE = "ENRICHED_VALUE"; + + public static final Map<ObjectIdentifier, AtomicReference<Map<String, String>>> MANAGED_TABLES = + new ConcurrentHashMap<>(); + + @Override + public Set<ConfigOption<?>> requiredOptions() { + return new HashSet<>(); + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + return new HashSet<>(); + } + + @Override + public Map<String, String> enrichOptions(Context context) { + Map<String, String> newOptions = new HashMap<>(context.getCatalogTable().getOptions()); + if (MANAGED_TABLES.containsKey(context.getObjectIdentifier())) { + newOptions.put(ENRICHED_KEY, ENRICHED_VALUE); + } + return newOptions; + } + + @Override + public void onCreateTable(Context context, boolean ignoreIfExists) { + MANAGED_TABLES.compute( + context.getObjectIdentifier(), + (k, v) -> { + if (v != null) { + if (v.get() == null) { + v.set(context.getCatalogTable().getOptions()); + } else if (!ignoreIfExists) { + throw new TableException("Table exists."); + } + } + return v; + }); + } + + @Override + public void onDropTable(Context context, boolean ignoreIfNotExists) { + AtomicReference<Map<String, String>> reference = + MANAGED_TABLES.get(context.getObjectIdentifier()); + if (reference != null) { + Map<String, String> previous = reference.getAndSet(null); + if (!context.getCatalogTable().getOptions().equals(previous) && !ignoreIfNotExists) { + throw new TableException("Table does not exist."); + } + } + } + + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + return new TestManagedTableSource(); + } + + /** Managed {@link DynamicTableSource} for testing. */ + public static class TestManagedTableSource implements ScanTableSource { + + @Override + public ChangelogMode getChangelogMode() { + throw new UnsupportedOperationException(); + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { + throw new UnsupportedOperationException(); + } + + @Override + public DynamicTableSource copy() { + throw new UnsupportedOperationException(); + } + + @Override + public String asSummaryString() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean equals(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public int hashCode() { + throw new UnsupportedOperationException(); + } + } +} diff --git a/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory index 8f50817..c969661 100644 --- a/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -22,4 +22,5 @@ org.apache.flink.table.factories.TestDynamicTableSourceOnlyFactory org.apache.flink.table.factories.TestConflictingDynamicTableFactory1 org.apache.flink.table.factories.TestConflictingDynamicTableFactory2 org.apache.flink.table.factories.TestCatalogFactory +org.apache.flink.table.factories.TestManagedTableFactory org.apache.flink.table.factories.module.DummyModuleFactory diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala index 2833f1d..0ec319d 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala @@ -258,16 +258,18 @@ class TableEnvironmentTest { | b int, | c varchar |) WITH ( - | 'connector' = 'COLLECTION' + | 'connector' = 'filesystem', + | 'format' = 'testcsv', + | 'path' = '_invalid' |) """.stripMargin tableEnv.executeSql(statement) - val alterTableResetStatement = "ALTER TABLE MyTable RESET ('connector')" + val alterTableResetStatement = "ALTER TABLE MyTable RESET ('format')" val tableResult = tableEnv.executeSql(alterTableResetStatement) assertEquals(ResultKind.SUCCESS, tableResult.getResultKind) assertEquals( - Map.empty.asJava, + Map("connector" -> "filesystem", "path" -> "_invalid").asJava, tableEnv.getCatalog(tableEnv.getCurrentCatalog).get() .getTable(ObjectPath.fromString(s"${tableEnv.getCurrentDatabase}.MyTable")).getOptions) expectedException.expect(classOf[ValidationException])
