EronWright closed pull request #7390: [FLINK-11237] [table] External Catalog 
Factory and Descriptor
URL: https://github.com/apache/flink/pull/7390
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index fe438b6871e..c02de7c0196 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -166,7 +166,7 @@ public T getClusterId() {
 
        public EnvironmentInstance createEnvironmentInstance() {
                try {
-                       return new EnvironmentInstance();
+                       return wrapClassLoader(EnvironmentInstance::new);
                } catch (Throwable t) {
                        // catch everything such that a wrong environment does 
not affect invocations
                        throw new SqlExecutionException("Could not create 
environment instance.", t);
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 36fd4ee0a63..060ac4185ad 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -193,27 +193,31 @@ public void start() {
 
        @Override
        public List<String> listTables(SessionContext session) throws 
SqlExecutionException {
-               final TableEnvironment tableEnv = 
getOrCreateExecutionContext(session)
+               final ExecutionContext<?> context = 
getOrCreateExecutionContext(session);
+               final TableEnvironment tableEnv = context
                        .createEnvironmentInstance()
                        .getTableEnvironment();
-               return Arrays.asList(tableEnv.listTables());
+               return context.wrapClassLoader(() -> 
Arrays.asList(tableEnv.listTables()));
        }
 
        @Override
        public List<String> listUserDefinedFunctions(SessionContext session) 
throws SqlExecutionException {
-               final TableEnvironment tableEnv = 
getOrCreateExecutionContext(session)
+               final ExecutionContext<?> context = 
getOrCreateExecutionContext(session);
+               final TableEnvironment tableEnv = context
                        .createEnvironmentInstance()
                        .getTableEnvironment();
-               return Arrays.asList(tableEnv.listUserDefinedFunctions());
+               return context.wrapClassLoader(() -> 
Arrays.asList(tableEnv.listUserDefinedFunctions()));
        }
 
        @Override
        public TableSchema getTableSchema(SessionContext session, String name) 
throws SqlExecutionException {
-               final TableEnvironment tableEnv = 
getOrCreateExecutionContext(session)
+               final ExecutionContext<?> context = 
getOrCreateExecutionContext(session);
+               final TableEnvironment tableEnv = context
                        .createEnvironmentInstance()
                        .getTableEnvironment();
                try {
-                       return tableEnv.scan(name).getSchema();
+                       // scanning requires table resolution step that might 
reference external tables
+                       return context.wrapClassLoader(() -> 
tableEnv.scan(name).getSchema());
                } catch (Throwable t) {
                        // catch everything such that the query does not crash 
the executor
                        throw new SqlExecutionException("No table with this 
name could be found.", t);
@@ -229,7 +233,7 @@ public String explainStatement(SessionContext session, 
String statement) throws
 
                // translate
                try {
-                       final Table table = createTable(tableEnv, statement);
+                       final Table table = createTable(context, tableEnv, 
statement);
                        // explanation requires an optimization step that might 
reference UDFs during code compilation
                        return context.wrapClassLoader(() -> 
tableEnv.explain(table));
                } catch (Throwable t) {
@@ -240,12 +244,14 @@ public String explainStatement(SessionContext session, 
String statement) throws
 
        @Override
        public List<String> completeStatement(SessionContext session, String 
statement, int position) {
-               final TableEnvironment tableEnv = 
getOrCreateExecutionContext(session)
+               final ExecutionContext<?> context = 
getOrCreateExecutionContext(session);
+               final TableEnvironment tableEnv = context
                                .createEnvironmentInstance()
                                .getTableEnvironment();
 
                try {
-                       return 
Arrays.asList(tableEnv.getCompletionHints(statement, position));
+                       // planning requires table resolution step that might 
reference external tables
+                       return context.wrapClassLoader(() -> 
Arrays.asList(tableEnv.getCompletionHints(statement, position)));
                } catch (Throwable t) {
                        // catch everything such that the query does not crash 
the executor
                        if (LOG.isDebugEnabled()) {
@@ -402,7 +408,7 @@ public void stop(SessionContext session) {
                final ExecutionContext.EnvironmentInstance envInst = 
context.createEnvironmentInstance();
 
                // create table
-               final Table table = createTable(envInst.getTableEnvironment(), 
query);
+               final Table table = createTable(context, 
envInst.getTableEnvironment(), query);
 
                // initialize result
                final DynamicResult<C> result = resultStore.createResult(
@@ -448,10 +454,11 @@ public void stop(SessionContext session) {
        /**
         * Creates a table using the given query in the given table environment.
         */
-       private Table createTable(TableEnvironment tableEnv, String 
selectQuery) {
+       private <C> Table createTable(ExecutionContext<C> context, 
TableEnvironment tableEnv, String selectQuery) {
                // parse and validate query
                try {
-                       return tableEnv.sqlQuery(selectQuery);
+                       // query statement requires table resolution step that 
might reference external tables
+                       return context.wrapClassLoader(() -> 
tableEnv.sqlQuery(selectQuery));
                } catch (Throwable t) {
                        // catch everything such that the query does not crash 
the executor
                        throw new SqlExecutionException("Invalid SQL 
statement.", t);
diff --git 
a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorDescriptor.java
 
b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorDescriptor.java
index a1d3e783cdc..ed62b46b36b 100644
--- 
a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorDescriptor.java
+++ 
b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorDescriptor.java
@@ -68,7 +68,7 @@ protected final boolean isFormatNeeded() {
 
        /**
         * Converts this descriptor into a set of connector properties. Usually 
prefixed with
-        * {@link FormatDescriptorValidator#FORMAT}.
+        * {@link ConnectorDescriptorValidator#CONNECTOR}.
         */
        protected abstract Map<String, String> toConnectorProperties();
 }
diff --git 
a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.java
 
b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.java
index 88cf34905f4..cf9860ae69e 100644
--- 
a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.java
+++ 
b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.java
@@ -26,6 +26,11 @@
 @Internal
 public abstract class ConnectorDescriptorValidator implements 
DescriptorValidator {
 
+       /**
+        * Prefix for connector-related properties.
+        */
+       public static final String CONNECTOR = "connector";
+
        /**
         * Key for describing the type of the connector. Usually used for 
factory discovery.
         */
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/descriptors/ExternalCatalogDescriptor.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/descriptors/ExternalCatalogDescriptor.java
new file mode 100644
index 00000000000..5ee7f06a9dd
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/descriptors/ExternalCatalogDescriptor.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Map;
+
+import static  
org.apache.flink.table.descriptors.ExternalCatalogDescriptorValidator.CATALOG_PROPERTY_VERSION;
+import static  
org.apache.flink.table.descriptors.ExternalCatalogDescriptorValidator.CATALOG_TYPE;
+
+/**
+ * Describes a external catalog of tables, views, and functions.
+ */
+@PublicEvolving
+public abstract class ExternalCatalogDescriptor extends DescriptorBase 
implements Descriptor {
+
+       private final String type;
+
+       private final int version;
+
+       /**
+        * Constructs a {@link ExternalCatalogDescriptor}.
+        *
+        * @param type string that identifies this catalog
+        * @param version property version for backwards compatibility
+        */
+       public ExternalCatalogDescriptor(String type, int version) {
+               this.type = type;
+               this.version = version;
+       }
+
+       @Override
+       public final Map<String, String> toProperties() {
+               final DescriptorProperties properties = new 
DescriptorProperties();
+               properties.putString(CATALOG_TYPE, type);
+               properties.putLong(CATALOG_PROPERTY_VERSION, version);
+               properties.putProperties(toCatalogProperties());
+               return properties.asMap();
+       }
+
+       /**
+        * Converts this descriptor into a set of catalog properties.
+        */
+       protected abstract Map<String, String> toCatalogProperties();
+}
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/descriptors/ExternalCatalogDescriptorValidator.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/descriptors/ExternalCatalogDescriptorValidator.java
new file mode 100644
index 00000000000..6cee73c5a16
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/descriptors/ExternalCatalogDescriptorValidator.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Validator for {@link ExternalCatalogDescriptor}.
+ */
+@Internal
+public abstract class ExternalCatalogDescriptorValidator implements 
DescriptorValidator {
+
+       /**
+        * Key for describing the type of the catalog. Usually used for factory 
discovery.
+        */
+       public static final String CATALOG_TYPE = "type";
+
+       /**
+        * Key for describing the property version. This property can be used 
for backwards
+        * compatibility in case the property format changes.
+        */
+       public static final String CATALOG_PROPERTY_VERSION = 
"property-version";
+
+       @Override
+       public void validate(DescriptorProperties properties) {
+               properties.validateString(CATALOG_TYPE, false, 1);
+               properties.validateInt(CATALOG_PROPERTY_VERSION, true, 0);
+       }
+}
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
index 45414ee3ff6..ce57070ac98 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
@@ -270,8 +270,8 @@ class ExternalCatalogTableBuilder(private val 
connectorDescriptor: ConnectorDesc
     * Explicitly declares this external table for supporting only batch 
environments.
     */
   def supportsBatch(): ExternalCatalogTableBuilder = {
-    isBatch = false
-    isStreaming = true
+    isBatch = true
+    isStreaming = false
     this
   }
 
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/BatchTableSinkFactory.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/BatchTableSinkFactory.scala
index 6fd1f7afb16..0b685239e05 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/BatchTableSinkFactory.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/BatchTableSinkFactory.scala
@@ -23,7 +23,7 @@ import java.util
 import org.apache.flink.table.sinks.BatchTableSink
 
 /**
-  * A factory to create configured table sink instances in a streaming 
environment based on
+  * A factory to create configured table sink instances in a batch environment 
based on
   * string-based properties. See also [[TableFactory]] for more information.
   *
   * @tparam T type of records that the factory consumes
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/ExternalCatalogFactory.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/ExternalCatalogFactory.scala
new file mode 100644
index 00000000000..ffc8d0bb1f3
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/ExternalCatalogFactory.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories
+
+import java.util
+
+import org.apache.flink.table.catalog.ExternalCatalog
+
+/**
+  * A factory to create configured external catalog instances based on 
string-based properties. See
+  * also [[TableFactory]] for more information.
+  */
+trait ExternalCatalogFactory extends TableFactory {
+
+  /**
+    * Creates and configures an 
[[org.apache.flink.table.catalog.ExternalCatalog]]
+    * using the given properties.
+    *
+    * @param properties normalized properties describing an external catalog.
+    * @return the configured external catalog.
+    */
+  def createExternalCatalog(properties: util.Map[String, String]): 
ExternalCatalog
+}
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
index 10b6a67ce6e..dfd36c5319f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
@@ -22,6 +22,7 @@ import java.util.{ServiceConfigurationError, ServiceLoader, 
Map => JMap}
 
 import org.apache.flink.table.api._
 import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
+import org.apache.flink.table.descriptors.ExternalCatalogDescriptorValidator._
 import org.apache.flink.table.descriptors.FormatDescriptorValidator._
 import org.apache.flink.table.descriptors.MetadataValidator._
 import org.apache.flink.table.descriptors.StatisticsValidator._
@@ -205,6 +206,7 @@ object TableFactoryService extends Logging {
       plainContext.remove(FORMAT_PROPERTY_VERSION)
       plainContext.remove(METADATA_PROPERTY_VERSION)
       plainContext.remove(STATISTICS_PROPERTY_VERSION)
+      plainContext.remove(CATALOG_PROPERTY_VERSION)
 
       // check if required context is met
       plainContext.forall(e => properties.contains(e._1) && properties(e._1) 
== e._2)
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryUtil.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryUtil.scala
index b773684b88f..2376c7a1292 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryUtil.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryUtil.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.table.factories
 
 import org.apache.flink.table.api.{BatchTableEnvironment, 
StreamTableEnvironment, TableEnvironment, TableException}
+import org.apache.flink.table.catalog.ExternalCatalog
 import org.apache.flink.table.descriptors.Descriptor
 import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.sources.TableSource
@@ -28,6 +29,21 @@ import org.apache.flink.table.sources.TableSource
   */
 object TableFactoryUtil {
 
+  /**
+    * Returns an external catalog for a table environment.
+    */
+  def findAndCreateExternalCatalog(
+      tableEnvironment: TableEnvironment,
+      descriptor: Descriptor)
+    : ExternalCatalog = {
+
+    val javaMap = descriptor.toProperties
+
+    TableFactoryService
+      .find(classOf[ExternalCatalogFactory], javaMap)
+      .createExternalCatalog(javaMap)
+  }
+
   /**
     * Returns a table source for a table environment.
     */
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/descriptors/ExternalCatalogDescriptorTest.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/descriptors/ExternalCatalogDescriptorTest.java
new file mode 100644
index 00000000000..1f3fc7bb0c3
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/descriptors/ExternalCatalogDescriptorTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.table.api.ValidationException;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.table.descriptors.ExternalCatalogDescriptorValidator.CATALOG_PROPERTY_VERSION;
+import static 
org.apache.flink.table.descriptors.ExternalCatalogDescriptorValidator.CATALOG_TYPE;
+
+/**
+ * Tests for the {@link ExternalCatalogDescriptor} descriptor and {@link 
ExternalCatalogDescriptorValidator} validator.
+ */
+public class ExternalCatalogDescriptorTest extends DescriptorTestBase {
+
+       private static final String CATALOG_TYPE_VALUE = 
"ExternalCatalogDescriptorTest";
+       private static final int CATALOG_PROPERTY_VERSION_VALUE = 1;
+       private static final String CATALOG_FOO = "foo";
+       private static final String CATALOG_FOO_VALUE = "foo-1";
+
+       @Test(expected = ValidationException.class)
+       public void testMissingCatalogType() {
+               removePropertyAndVerify(descriptors().get(0), CATALOG_TYPE);
+       }
+
+       @Test(expected = ValidationException.class)
+       public void testMissingFoo() {
+               removePropertyAndVerify(descriptors().get(0), CATALOG_FOO);
+       }
+
+       @Override
+       protected List<Descriptor> descriptors() {
+               final Descriptor minimumDesc = new 
TestExternalCatalogDescriptor(CATALOG_FOO_VALUE);
+               return Collections.singletonList(minimumDesc);
+       }
+
+       @Override
+       protected List<Map<String, String>> properties() {
+               final Map<String, String> minimumProps = new HashMap<>();
+               minimumProps.put(CATALOG_TYPE, CATALOG_TYPE_VALUE);
+               minimumProps.put(CATALOG_PROPERTY_VERSION, "" + 
CATALOG_PROPERTY_VERSION_VALUE);
+               minimumProps.put(CATALOG_FOO, CATALOG_FOO_VALUE);
+               return Collections.singletonList(minimumProps);
+       }
+
+       @Override
+       protected DescriptorValidator validator() {
+               return new TestExternalCatalogDescriptorValidator();
+       }
+
+       class TestExternalCatalogDescriptor extends ExternalCatalogDescriptor {
+               private String foo;
+
+               public TestExternalCatalogDescriptor(@Nullable String foo) {
+                       super(CATALOG_TYPE_VALUE, 
CATALOG_PROPERTY_VERSION_VALUE);
+                       this.foo = foo;
+               }
+
+               @Override
+               protected Map<String, String> toCatalogProperties() {
+                       DescriptorProperties properties = new 
DescriptorProperties();
+                       if (foo != null) {
+                               properties.putString(CATALOG_FOO, foo);
+                       }
+                       return properties.asMap();
+               }
+       }
+
+       class TestExternalCatalogDescriptorValidator extends 
ExternalCatalogDescriptorValidator {
+               @Override
+               public void validate(DescriptorProperties properties) {
+                       super.validate(properties);
+                       properties.validateString(CATALOG_FOO, false, 1);
+               }
+       }
+}
diff --git 
a/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
 
b/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index c97fe8e9945..3a79ceecf9e 100644
--- 
a/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ 
b/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -19,3 +19,4 @@ org.apache.flink.table.factories.utils.TestTableSinkFactory
 org.apache.flink.table.factories.utils.TestTableSourceFactory
 org.apache.flink.table.factories.utils.TestTableFormatFactory
 org.apache.flink.table.factories.utils.TestAmbiguousTableFormatFactory
+org.apache.flink.table.factories.utils.TestExternalCatalogFactory
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/ExternalCatalogFactoryServiceTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/ExternalCatalogFactoryServiceTest.scala
new file mode 100644
index 00000000000..acd119ca9ea
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/ExternalCatalogFactoryServiceTest.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories
+
+import java.util.{HashMap => JHashMap, Map => JMap}
+
+import org.apache.flink.table.api.NoMatchingTableFactoryException
+import 
org.apache.flink.table.descriptors.ExternalCatalogDescriptorValidator.{CATALOG_PROPERTY_VERSION,
 CATALOG_TYPE}
+import org.apache.flink.table.factories.utils.TestExternalCatalogFactory
+import 
org.apache.flink.table.factories.utils.TestExternalCatalogFactory.CATALOG_TYPE_VALUE_TEST
+import org.junit.Assert._
+import org.junit.Test
+
+/**
+  * Tests for testing external catalog discovery using 
[[TableFactoryService]]. The tests assume the
+  * external catalog factory [[TestExternalCatalogFactory]] is registered.
+  */
+class ExternalCatalogFactoryServiceTest {
+
+  @Test
+  def testValidProperties(): Unit = {
+    val props = properties()
+    assertTrue(TableFactoryService.find(classOf[ExternalCatalogFactory], props)
+      .isInstanceOf[TestExternalCatalogFactory])
+  }
+
+  @Test(expected = classOf[NoMatchingTableFactoryException])
+  def testInvalidContext(): Unit = {
+    val props = properties()
+    props.put(CATALOG_TYPE, "unknown-catalog-type")
+    TableFactoryService.find(classOf[ExternalCatalogFactory], props)
+  }
+
+  @Test
+  def testDifferentContextVersion(): Unit = {
+    val props = properties()
+    props.put(CATALOG_PROPERTY_VERSION, "2")
+    // the external catalog should still be found
+    assertTrue(TableFactoryService.find(classOf[ExternalCatalogFactory], props)
+      .isInstanceOf[TestExternalCatalogFactory])
+  }
+
+  @Test(expected = classOf[NoMatchingTableFactoryException])
+  def testUnsupportedProperty(): Unit = {
+    val props = properties()
+    props.put("unknown-property", "/new/path")
+    TableFactoryService.find(classOf[ExternalCatalogFactory], props)
+  }
+
+  private def properties(): JMap[String, String] = {
+    val properties = new JHashMap[String, String]()
+    properties.put(CATALOG_TYPE, CATALOG_TYPE_VALUE_TEST)
+    properties.put(CATALOG_PROPERTY_VERSION, "1")
+    properties
+  }
+}
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestExternalCatalogFactory.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestExternalCatalogFactory.scala
new file mode 100644
index 00000000000..e2c1d913fd6
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestExternalCatalogFactory.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.utils
+
+import java.util
+import java.util.Collections
+
+import org.apache.flink.table.catalog.ExternalCatalog
+import 
org.apache.flink.table.descriptors.ExternalCatalogDescriptorValidator.{CATALOG_PROPERTY_VERSION,
 CATALOG_TYPE}
+import org.apache.flink.table.factories.utils.TestExternalCatalogFactory._
+import org.apache.flink.table.factories.ExternalCatalogFactory
+import org.apache.flink.table.runtime.utils.CommonTestData
+
+/**
+  * External catalog factory for testing.
+  *
+  * This factory provides the in-memory catalog from 
[[CommonTestData.getInMemoryTestCatalog()]] as catalog type "test".
+  * Note that the provided catalog tables support only streaming environments.
+  */
+class TestExternalCatalogFactory extends ExternalCatalogFactory {
+
+  override def requiredContext: util.Map[String, String] = {
+    val context = new util.HashMap[String, String]
+    context.put(CATALOG_TYPE, CATALOG_TYPE_VALUE_TEST)
+    context.put(CATALOG_PROPERTY_VERSION, "1")
+    context
+  }
+
+  override def supportedProperties: util.List[String] = Collections.emptyList()
+
+  override def createExternalCatalog(properties: util.Map[String, String]): 
ExternalCatalog = {
+    CommonTestData.getInMemoryTestCatalog(isStreaming = true)
+  }
+}
+
+object TestExternalCatalogFactory {
+  val CATALOG_TYPE_VALUE_TEST = "test"
+}
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
index 64fcc8ac7c4..1209595837b 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
@@ -85,7 +85,9 @@ object CommonTestData {
       .withSchema(schemaDesc1)
 
     if (isStreaming) {
-      externalTableBuilder1.inAppendMode()
+      externalTableBuilder1.supportsStreaming().inAppendMode()
+    } else {
+      externalTableBuilder1.supportsBatch()
     }
 
     val csvRecord2 = Seq(
@@ -126,7 +128,9 @@ object CommonTestData {
       .withSchema(schemaDesc2)
 
     if (isStreaming) {
-      externalTableBuilder2.inAppendMode()
+      externalTableBuilder2.supportsStreaming().inAppendMode()
+    } else {
+      externalTableBuilder2.supportsBatch()
     }
 
     val tempFilePath3 = writeToTempFile("", "csv-test3", "tmp")
@@ -145,7 +149,9 @@ object CommonTestData {
       .withSchema(schemaDesc3)
 
     if (isStreaming) {
-      externalTableBuilder3.inAppendMode()
+      externalTableBuilder3.supportsStreaming().inAppendMode()
+    } else {
+      externalTableBuilder3.supportsBatch()
     }
 
     val catalog = new InMemoryExternalCatalog("test")


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to