EronWright closed pull request #7390: [FLINK-11237] [table] External Catalog
Factory and Descriptor
URL: https://github.com/apache/flink/pull/7390
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index fe438b6871e..c02de7c0196 100644
---
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -166,7 +166,7 @@ public T getClusterId() {
public EnvironmentInstance createEnvironmentInstance() {
try {
- return new EnvironmentInstance();
+ return wrapClassLoader(EnvironmentInstance::new);
} catch (Throwable t) {
// catch everything such that a wrong environment does
not affect invocations
throw new SqlExecutionException("Could not create
environment instance.", t);
diff --git
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 36fd4ee0a63..060ac4185ad 100644
---
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -193,27 +193,31 @@ public void start() {
@Override
public List<String> listTables(SessionContext session) throws
SqlExecutionException {
- final TableEnvironment tableEnv =
getOrCreateExecutionContext(session)
+ final ExecutionContext<?> context =
getOrCreateExecutionContext(session);
+ final TableEnvironment tableEnv = context
.createEnvironmentInstance()
.getTableEnvironment();
- return Arrays.asList(tableEnv.listTables());
+ return context.wrapClassLoader(() ->
Arrays.asList(tableEnv.listTables()));
}
@Override
public List<String> listUserDefinedFunctions(SessionContext session)
throws SqlExecutionException {
- final TableEnvironment tableEnv =
getOrCreateExecutionContext(session)
+ final ExecutionContext<?> context =
getOrCreateExecutionContext(session);
+ final TableEnvironment tableEnv = context
.createEnvironmentInstance()
.getTableEnvironment();
- return Arrays.asList(tableEnv.listUserDefinedFunctions());
+ return context.wrapClassLoader(() ->
Arrays.asList(tableEnv.listUserDefinedFunctions()));
}
@Override
public TableSchema getTableSchema(SessionContext session, String name)
throws SqlExecutionException {
- final TableEnvironment tableEnv =
getOrCreateExecutionContext(session)
+ final ExecutionContext<?> context =
getOrCreateExecutionContext(session);
+ final TableEnvironment tableEnv = context
.createEnvironmentInstance()
.getTableEnvironment();
try {
- return tableEnv.scan(name).getSchema();
+ // scanning requires table resolution step that might
reference external tables
+ return context.wrapClassLoader(() ->
tableEnv.scan(name).getSchema());
} catch (Throwable t) {
// catch everything such that the query does not crash
the executor
throw new SqlExecutionException("No table with this
name could be found.", t);
@@ -229,7 +233,7 @@ public String explainStatement(SessionContext session,
String statement) throws
// translate
try {
- final Table table = createTable(tableEnv, statement);
+ final Table table = createTable(context, tableEnv,
statement);
// explanation requires an optimization step that might
reference UDFs during code compilation
return context.wrapClassLoader(() ->
tableEnv.explain(table));
} catch (Throwable t) {
@@ -240,12 +244,14 @@ public String explainStatement(SessionContext session,
String statement) throws
@Override
public List<String> completeStatement(SessionContext session, String
statement, int position) {
- final TableEnvironment tableEnv =
getOrCreateExecutionContext(session)
+ final ExecutionContext<?> context =
getOrCreateExecutionContext(session);
+ final TableEnvironment tableEnv = context
.createEnvironmentInstance()
.getTableEnvironment();
try {
- return
Arrays.asList(tableEnv.getCompletionHints(statement, position));
+ // planning requires table resolution step that might
reference external tables
+ return context.wrapClassLoader(() ->
Arrays.asList(tableEnv.getCompletionHints(statement, position)));
} catch (Throwable t) {
// catch everything such that the query does not crash
the executor
if (LOG.isDebugEnabled()) {
@@ -402,7 +408,7 @@ public void stop(SessionContext session) {
final ExecutionContext.EnvironmentInstance envInst =
context.createEnvironmentInstance();
// create table
- final Table table = createTable(envInst.getTableEnvironment(),
query);
+ final Table table = createTable(context,
envInst.getTableEnvironment(), query);
// initialize result
final DynamicResult<C> result = resultStore.createResult(
@@ -448,10 +454,11 @@ public void stop(SessionContext session) {
/**
* Creates a table using the given query in the given table environment.
*/
- private Table createTable(TableEnvironment tableEnv, String
selectQuery) {
+ private <C> Table createTable(ExecutionContext<C> context,
TableEnvironment tableEnv, String selectQuery) {
// parse and validate query
try {
- return tableEnv.sqlQuery(selectQuery);
+ // query statement requires table resolution step that
might reference external tables
+ return context.wrapClassLoader(() ->
tableEnv.sqlQuery(selectQuery));
} catch (Throwable t) {
// catch everything such that the query does not crash
the executor
throw new SqlExecutionException("Invalid SQL
statement.", t);
diff --git
a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorDescriptor.java
b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorDescriptor.java
index a1d3e783cdc..ed62b46b36b 100644
---
a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorDescriptor.java
+++
b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorDescriptor.java
@@ -68,7 +68,7 @@ protected final boolean isFormatNeeded() {
/**
* Converts this descriptor into a set of connector properties. Usually
prefixed with
- * {@link FormatDescriptorValidator#FORMAT}.
+ * {@link ConnectorDescriptorValidator#CONNECTOR}.
*/
protected abstract Map<String, String> toConnectorProperties();
}
diff --git
a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.java
b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.java
index 88cf34905f4..cf9860ae69e 100644
---
a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.java
+++
b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.java
@@ -26,6 +26,11 @@
@Internal
public abstract class ConnectorDescriptorValidator implements
DescriptorValidator {
+ /**
+ * Prefix for connector-related properties.
+ */
+ public static final String CONNECTOR = "connector";
+
/**
* Key for describing the type of the connector. Usually used for
factory discovery.
*/
diff --git
a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/descriptors/ExternalCatalogDescriptor.java
b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/descriptors/ExternalCatalogDescriptor.java
new file mode 100644
index 00000000000..5ee7f06a9dd
--- /dev/null
+++
b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/descriptors/ExternalCatalogDescriptor.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Map;
+
+import static
org.apache.flink.table.descriptors.ExternalCatalogDescriptorValidator.CATALOG_PROPERTY_VERSION;
+import static
org.apache.flink.table.descriptors.ExternalCatalogDescriptorValidator.CATALOG_TYPE;
+
+/**
+ * Describes a external catalog of tables, views, and functions.
+ */
+@PublicEvolving
+public abstract class ExternalCatalogDescriptor extends DescriptorBase
implements Descriptor {
+
+ private final String type;
+
+ private final int version;
+
+ /**
+ * Constructs a {@link ExternalCatalogDescriptor}.
+ *
+ * @param type string that identifies this catalog
+ * @param version property version for backwards compatibility
+ */
+ public ExternalCatalogDescriptor(String type, int version) {
+ this.type = type;
+ this.version = version;
+ }
+
+ @Override
+ public final Map<String, String> toProperties() {
+ final DescriptorProperties properties = new
DescriptorProperties();
+ properties.putString(CATALOG_TYPE, type);
+ properties.putLong(CATALOG_PROPERTY_VERSION, version);
+ properties.putProperties(toCatalogProperties());
+ return properties.asMap();
+ }
+
+ /**
+ * Converts this descriptor into a set of catalog properties.
+ */
+ protected abstract Map<String, String> toCatalogProperties();
+}
diff --git
a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/descriptors/ExternalCatalogDescriptorValidator.java
b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/descriptors/ExternalCatalogDescriptorValidator.java
new file mode 100644
index 00000000000..6cee73c5a16
--- /dev/null
+++
b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/descriptors/ExternalCatalogDescriptorValidator.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Validator for {@link ExternalCatalogDescriptor}.
+ */
+@Internal
+public abstract class ExternalCatalogDescriptorValidator implements
DescriptorValidator {
+
+ /**
+ * Key for describing the type of the catalog. Usually used for factory
discovery.
+ */
+ public static final String CATALOG_TYPE = "type";
+
+ /**
+ * Key for describing the property version. This property can be used
for backwards
+ * compatibility in case the property format changes.
+ */
+ public static final String CATALOG_PROPERTY_VERSION =
"property-version";
+
+ @Override
+ public void validate(DescriptorProperties properties) {
+ properties.validateString(CATALOG_TYPE, false, 1);
+ properties.validateInt(CATALOG_PROPERTY_VERSION, true, 0);
+ }
+}
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
index 45414ee3ff6..ce57070ac98 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
@@ -270,8 +270,8 @@ class ExternalCatalogTableBuilder(private val
connectorDescriptor: ConnectorDesc
* Explicitly declares this external table for supporting only batch
environments.
*/
def supportsBatch(): ExternalCatalogTableBuilder = {
- isBatch = false
- isStreaming = true
+ isBatch = true
+ isStreaming = false
this
}
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/BatchTableSinkFactory.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/BatchTableSinkFactory.scala
index 6fd1f7afb16..0b685239e05 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/BatchTableSinkFactory.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/BatchTableSinkFactory.scala
@@ -23,7 +23,7 @@ import java.util
import org.apache.flink.table.sinks.BatchTableSink
/**
- * A factory to create configured table sink instances in a streaming
environment based on
+ * A factory to create configured table sink instances in a batch environment
based on
* string-based properties. See also [[TableFactory]] for more information.
*
* @tparam T type of records that the factory consumes
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/ExternalCatalogFactory.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/ExternalCatalogFactory.scala
new file mode 100644
index 00000000000..ffc8d0bb1f3
--- /dev/null
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/ExternalCatalogFactory.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories
+
+import java.util
+
+import org.apache.flink.table.catalog.ExternalCatalog
+
+/**
+ * A factory to create configured external catalog instances based on
string-based properties. See
+ * also [[TableFactory]] for more information.
+ */
+trait ExternalCatalogFactory extends TableFactory {
+
+ /**
+ * Creates and configures an
[[org.apache.flink.table.catalog.ExternalCatalog]]
+ * using the given properties.
+ *
+ * @param properties normalized properties describing an external catalog.
+ * @return the configured external catalog.
+ */
+ def createExternalCatalog(properties: util.Map[String, String]):
ExternalCatalog
+}
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
index 10b6a67ce6e..dfd36c5319f 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
@@ -22,6 +22,7 @@ import java.util.{ServiceConfigurationError, ServiceLoader,
Map => JMap}
import org.apache.flink.table.api._
import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
+import org.apache.flink.table.descriptors.ExternalCatalogDescriptorValidator._
import org.apache.flink.table.descriptors.FormatDescriptorValidator._
import org.apache.flink.table.descriptors.MetadataValidator._
import org.apache.flink.table.descriptors.StatisticsValidator._
@@ -205,6 +206,7 @@ object TableFactoryService extends Logging {
plainContext.remove(FORMAT_PROPERTY_VERSION)
plainContext.remove(METADATA_PROPERTY_VERSION)
plainContext.remove(STATISTICS_PROPERTY_VERSION)
+ plainContext.remove(CATALOG_PROPERTY_VERSION)
// check if required context is met
plainContext.forall(e => properties.contains(e._1) && properties(e._1)
== e._2)
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryUtil.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryUtil.scala
index b773684b88f..2376c7a1292 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryUtil.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryUtil.scala
@@ -19,6 +19,7 @@
package org.apache.flink.table.factories
import org.apache.flink.table.api.{BatchTableEnvironment,
StreamTableEnvironment, TableEnvironment, TableException}
+import org.apache.flink.table.catalog.ExternalCatalog
import org.apache.flink.table.descriptors.Descriptor
import org.apache.flink.table.sinks.TableSink
import org.apache.flink.table.sources.TableSource
@@ -28,6 +29,21 @@ import org.apache.flink.table.sources.TableSource
*/
object TableFactoryUtil {
+ /**
+ * Returns an external catalog for a table environment.
+ */
+ def findAndCreateExternalCatalog(
+ tableEnvironment: TableEnvironment,
+ descriptor: Descriptor)
+ : ExternalCatalog = {
+
+ val javaMap = descriptor.toProperties
+
+ TableFactoryService
+ .find(classOf[ExternalCatalogFactory], javaMap)
+ .createExternalCatalog(javaMap)
+ }
+
/**
* Returns a table source for a table environment.
*/
diff --git
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/descriptors/ExternalCatalogDescriptorTest.java
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/descriptors/ExternalCatalogDescriptorTest.java
new file mode 100644
index 00000000000..1f3fc7bb0c3
--- /dev/null
+++
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/descriptors/ExternalCatalogDescriptorTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.table.api.ValidationException;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.flink.table.descriptors.ExternalCatalogDescriptorValidator.CATALOG_PROPERTY_VERSION;
+import static
org.apache.flink.table.descriptors.ExternalCatalogDescriptorValidator.CATALOG_TYPE;
+
+/**
+ * Tests for the {@link ExternalCatalogDescriptor} descriptor and {@link
ExternalCatalogDescriptorValidator} validator.
+ */
+public class ExternalCatalogDescriptorTest extends DescriptorTestBase {
+
+ private static final String CATALOG_TYPE_VALUE =
"ExternalCatalogDescriptorTest";
+ private static final int CATALOG_PROPERTY_VERSION_VALUE = 1;
+ private static final String CATALOG_FOO = "foo";
+ private static final String CATALOG_FOO_VALUE = "foo-1";
+
+ @Test(expected = ValidationException.class)
+ public void testMissingCatalogType() {
+ removePropertyAndVerify(descriptors().get(0), CATALOG_TYPE);
+ }
+
+ @Test(expected = ValidationException.class)
+ public void testMissingFoo() {
+ removePropertyAndVerify(descriptors().get(0), CATALOG_FOO);
+ }
+
+ @Override
+ protected List<Descriptor> descriptors() {
+ final Descriptor minimumDesc = new
TestExternalCatalogDescriptor(CATALOG_FOO_VALUE);
+ return Collections.singletonList(minimumDesc);
+ }
+
+ @Override
+ protected List<Map<String, String>> properties() {
+ final Map<String, String> minimumProps = new HashMap<>();
+ minimumProps.put(CATALOG_TYPE, CATALOG_TYPE_VALUE);
+ minimumProps.put(CATALOG_PROPERTY_VERSION, "" +
CATALOG_PROPERTY_VERSION_VALUE);
+ minimumProps.put(CATALOG_FOO, CATALOG_FOO_VALUE);
+ return Collections.singletonList(minimumProps);
+ }
+
+ @Override
+ protected DescriptorValidator validator() {
+ return new TestExternalCatalogDescriptorValidator();
+ }
+
+ class TestExternalCatalogDescriptor extends ExternalCatalogDescriptor {
+ private String foo;
+
+ public TestExternalCatalogDescriptor(@Nullable String foo) {
+ super(CATALOG_TYPE_VALUE,
CATALOG_PROPERTY_VERSION_VALUE);
+ this.foo = foo;
+ }
+
+ @Override
+ protected Map<String, String> toCatalogProperties() {
+ DescriptorProperties properties = new
DescriptorProperties();
+ if (foo != null) {
+ properties.putString(CATALOG_FOO, foo);
+ }
+ return properties.asMap();
+ }
+ }
+
+ class TestExternalCatalogDescriptorValidator extends
ExternalCatalogDescriptorValidator {
+ @Override
+ public void validate(DescriptorProperties properties) {
+ super.validate(properties);
+ properties.validateString(CATALOG_FOO, false, 1);
+ }
+ }
+}
diff --git
a/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
b/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index c97fe8e9945..3a79ceecf9e 100644
---
a/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++
b/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -19,3 +19,4 @@ org.apache.flink.table.factories.utils.TestTableSinkFactory
org.apache.flink.table.factories.utils.TestTableSourceFactory
org.apache.flink.table.factories.utils.TestTableFormatFactory
org.apache.flink.table.factories.utils.TestAmbiguousTableFormatFactory
+org.apache.flink.table.factories.utils.TestExternalCatalogFactory
diff --git
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/ExternalCatalogFactoryServiceTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/ExternalCatalogFactoryServiceTest.scala
new file mode 100644
index 00000000000..acd119ca9ea
--- /dev/null
+++
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/ExternalCatalogFactoryServiceTest.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories
+
+import java.util.{HashMap => JHashMap, Map => JMap}
+
+import org.apache.flink.table.api.NoMatchingTableFactoryException
+import
org.apache.flink.table.descriptors.ExternalCatalogDescriptorValidator.{CATALOG_PROPERTY_VERSION,
CATALOG_TYPE}
+import org.apache.flink.table.factories.utils.TestExternalCatalogFactory
+import
org.apache.flink.table.factories.utils.TestExternalCatalogFactory.CATALOG_TYPE_VALUE_TEST
+import org.junit.Assert._
+import org.junit.Test
+
+/**
+ * Tests for testing external catalog discovery using
[[TableFactoryService]]. The tests assume the
+ * external catalog factory [[TestExternalCatalogFactory]] is registered.
+ */
+class ExternalCatalogFactoryServiceTest {
+
+ @Test
+ def testValidProperties(): Unit = {
+ val props = properties()
+ assertTrue(TableFactoryService.find(classOf[ExternalCatalogFactory], props)
+ .isInstanceOf[TestExternalCatalogFactory])
+ }
+
+ @Test(expected = classOf[NoMatchingTableFactoryException])
+ def testInvalidContext(): Unit = {
+ val props = properties()
+ props.put(CATALOG_TYPE, "unknown-catalog-type")
+ TableFactoryService.find(classOf[ExternalCatalogFactory], props)
+ }
+
+ @Test
+ def testDifferentContextVersion(): Unit = {
+ val props = properties()
+ props.put(CATALOG_PROPERTY_VERSION, "2")
+ // the external catalog should still be found
+ assertTrue(TableFactoryService.find(classOf[ExternalCatalogFactory], props)
+ .isInstanceOf[TestExternalCatalogFactory])
+ }
+
+ @Test(expected = classOf[NoMatchingTableFactoryException])
+ def testUnsupportedProperty(): Unit = {
+ val props = properties()
+ props.put("unknown-property", "/new/path")
+ TableFactoryService.find(classOf[ExternalCatalogFactory], props)
+ }
+
+ private def properties(): JMap[String, String] = {
+ val properties = new JHashMap[String, String]()
+ properties.put(CATALOG_TYPE, CATALOG_TYPE_VALUE_TEST)
+ properties.put(CATALOG_PROPERTY_VERSION, "1")
+ properties
+ }
+}
diff --git
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestExternalCatalogFactory.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestExternalCatalogFactory.scala
new file mode 100644
index 00000000000..e2c1d913fd6
--- /dev/null
+++
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestExternalCatalogFactory.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.utils
+
+import java.util
+import java.util.Collections
+
+import org.apache.flink.table.catalog.ExternalCatalog
+import
org.apache.flink.table.descriptors.ExternalCatalogDescriptorValidator.{CATALOG_PROPERTY_VERSION,
CATALOG_TYPE}
+import org.apache.flink.table.factories.utils.TestExternalCatalogFactory._
+import org.apache.flink.table.factories.ExternalCatalogFactory
+import org.apache.flink.table.runtime.utils.CommonTestData
+
+/**
+ * External catalog factory for testing.
+ *
+ * This factory provides the in-memory catalog from
[[CommonTestData.getInMemoryTestCatalog()]] as catalog type "test".
+ * Note that the provided catalog tables support only streaming environments.
+ */
+class TestExternalCatalogFactory extends ExternalCatalogFactory {
+
+ override def requiredContext: util.Map[String, String] = {
+ val context = new util.HashMap[String, String]
+ context.put(CATALOG_TYPE, CATALOG_TYPE_VALUE_TEST)
+ context.put(CATALOG_PROPERTY_VERSION, "1")
+ context
+ }
+
+ override def supportedProperties: util.List[String] = Collections.emptyList()
+
+ override def createExternalCatalog(properties: util.Map[String, String]):
ExternalCatalog = {
+ CommonTestData.getInMemoryTestCatalog(isStreaming = true)
+ }
+}
+
+object TestExternalCatalogFactory {
+ val CATALOG_TYPE_VALUE_TEST = "test"
+}
diff --git
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
index 64fcc8ac7c4..1209595837b 100644
---
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
+++
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
@@ -85,7 +85,9 @@ object CommonTestData {
.withSchema(schemaDesc1)
if (isStreaming) {
- externalTableBuilder1.inAppendMode()
+ externalTableBuilder1.supportsStreaming().inAppendMode()
+ } else {
+ externalTableBuilder1.supportsBatch()
}
val csvRecord2 = Seq(
@@ -126,7 +128,9 @@ object CommonTestData {
.withSchema(schemaDesc2)
if (isStreaming) {
- externalTableBuilder2.inAppendMode()
+ externalTableBuilder2.supportsStreaming().inAppendMode()
+ } else {
+ externalTableBuilder2.supportsBatch()
}
val tempFilePath3 = writeToTempFile("", "csv-test3", "tmp")
@@ -145,7 +149,9 @@ object CommonTestData {
.withSchema(schemaDesc3)
if (isStreaming) {
- externalTableBuilder3.inAppendMode()
+ externalTableBuilder3.supportsStreaming().inAppendMode()
+ } else {
+ externalTableBuilder3.supportsBatch()
}
val catalog = new InMemoryExternalCatalog("test")
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services