This is an automated email from the ASF dual-hosted git repository.
rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b1192e8a8e8 [FLINK-39029] move catalog store factory builder to new
class (#27531)
b1192e8a8e8 is described below
commit b1192e8a8e8d2756388394acf42d58a38b419cd3
Author: Hao Li <[email protected]>
AuthorDate: Thu Feb 19 10:38:22 2026 -0800
[FLINK-39029] move catalog store factory builder to new class (#27531)
* [FLINK-39029] move catalog store factory builder to new class
---
.gitignore | 1 +
.../gateway/service/context/SessionContext.java | 5 +-
.../java/internal/StreamTableEnvironmentImpl.java | 17 +--
.../flink/table/api/EnvironmentSettings.java | 5 +-
.../table/api/internal/TableEnvironmentImpl.java | 17 +--
.../flink/table/factories/ApiFactoryUtil.java | 130 ++++++++++++++++++++
.../flink/table/factories/TableFactoryUtil.java | 57 +--------
.../flink/table/factories/ApiFactoryUtilTest.java | 136 +++++++++++++++++++++
.../internal/StreamTableEnvironmentImpl.scala | 23 ++--
9 files changed, 304 insertions(+), 87 deletions(-)
diff --git a/.gitignore b/.gitignore
index 7f204c296f3..0ed273dea3d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -7,6 +7,7 @@ scalastyle-output.xml
!.idea/vcs.xml
!.idea/icon.png
.vscode
+.claude
.metals
.bloop
.cursor
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
index 12bfc2d8e5a..b5463d44267 100644
---
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
@@ -31,6 +31,7 @@ import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.CatalogStoreHolder;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.factories.ApiFactoryUtil;
import org.apache.flink.table.factories.CatalogStoreFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.TableFactoryUtil;
@@ -390,9 +391,9 @@ public class SessionContext {
SessionEnvironment environment) {
CatalogStoreFactory catalogStoreFactory =
-
TableFactoryUtil.findAndCreateCatalogStoreFactory(configuration,
userClassLoader);
+ ApiFactoryUtil.findAndCreateCatalogStoreFactory(configuration,
userClassLoader);
CatalogStoreFactory.Context catalogStoreFactoryContext =
-
TableFactoryUtil.buildCatalogStoreFactoryContext(configuration,
userClassLoader);
+ ApiFactoryUtil.buildCatalogStoreFactoryContext(configuration,
userClassLoader);
catalogStoreFactory.open(catalogStoreFactoryContext);
CatalogStoreHolder catalogStore =
CatalogStoreHolder.newBuilder()
diff --git
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
index 2bba1d76d69..810546ed6a6 100644
---
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
+++
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
@@ -46,6 +46,7 @@ import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.ApiFactoryUtil;
import org.apache.flink.table.factories.CatalogStoreFactory;
import org.apache.flink.table.factories.PlannerFactoryUtil;
import org.apache.flink.table.factories.TableFactoryUtil;
@@ -112,17 +113,11 @@ public final class StreamTableEnvironmentImpl extends
AbstractStreamTableEnviron
new ResourceManager(settings.getConfiguration(),
userClassLoader);
final ModuleManager moduleManager = new ModuleManager();
- final CatalogStoreFactory catalogStoreFactory =
- TableFactoryUtil.findAndCreateCatalogStoreFactory(
- settings.getConfiguration(), userClassLoader);
- final CatalogStoreFactory.Context catalogStoreFactoryContext =
- TableFactoryUtil.buildCatalogStoreFactoryContext(
- settings.getConfiguration(), userClassLoader);
- catalogStoreFactory.open(catalogStoreFactoryContext);
- final CatalogStore catalogStore =
- settings.getCatalogStore() != null
- ? settings.getCatalogStore()
- : catalogStoreFactory.createCatalogStore();
+ final ApiFactoryUtil.CatalogStoreResult catalogStoreResult =
+ ApiFactoryUtil.getOrCreateCatalogStore(
+ settings.getCatalogStore(),
settings.getConfiguration(), userClassLoader);
+ final CatalogStore catalogStore = catalogStoreResult.getCatalogStore();
+ final CatalogStoreFactory catalogStoreFactory =
catalogStoreResult.getCatalogStoreFactory();
final CatalogManager catalogManager =
CatalogManager.newBuilder()
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
index 67fcfcec463..e08704a9b99 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
@@ -144,9 +144,8 @@ public class EnvironmentSettings {
}
@Internal
- @Nullable
- public CatalogStore getCatalogStore() {
- return catalogStore;
+ public Optional<CatalogStore> getCatalogStore() {
+ return Optional.ofNullable(catalogStore);
}
@Internal
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 6a40fa9c238..388b4f969db 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -83,6 +83,7 @@ import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ModelReferenceExpression;
import org.apache.flink.table.expressions.TableReferenceExpression;
import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor;
+import org.apache.flink.table.factories.ApiFactoryUtil;
import org.apache.flink.table.factories.CatalogStoreFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.PlannerFactoryUtil;
@@ -251,17 +252,11 @@ public class TableEnvironmentImpl implements
TableEnvironmentInternal {
userClassLoader, ExecutorFactory.class,
ExecutorFactory.DEFAULT_IDENTIFIER);
final Executor executor =
executorFactory.create(settings.getConfiguration());
- final CatalogStoreFactory catalogStoreFactory =
- TableFactoryUtil.findAndCreateCatalogStoreFactory(
- settings.getConfiguration(), userClassLoader);
- final CatalogStoreFactory.Context context =
- TableFactoryUtil.buildCatalogStoreFactoryContext(
- settings.getConfiguration(), userClassLoader);
- catalogStoreFactory.open(context);
- final CatalogStore catalogStore =
- settings.getCatalogStore() != null
- ? settings.getCatalogStore()
- : catalogStoreFactory.createCatalogStore();
+ final ApiFactoryUtil.CatalogStoreResult catalogStoreResult =
+ ApiFactoryUtil.getOrCreateCatalogStore(
+ settings.getCatalogStore(),
settings.getConfiguration(), userClassLoader);
+ final CatalogStore catalogStore = catalogStoreResult.getCatalogStore();
+ final CatalogStoreFactory catalogStoreFactory =
catalogStoreResult.getCatalogStoreFactory();
// use configuration to init table config
final TableConfig tableConfig = TableConfig.getDefault();
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ApiFactoryUtil.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ApiFactoryUtil.java
new file mode 100644
index 00000000000..cb3b9189d46
--- /dev/null
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ApiFactoryUtil.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.table.catalog.CatalogStore;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Utility for dealing with catalog store factories. */
+@Internal
+public class ApiFactoryUtil {
+
+ /** Result holder for catalog store and factory. */
+ @Internal
+ public static class CatalogStoreResult {
+ private final CatalogStore catalogStore;
+ @Nullable private final CatalogStoreFactory catalogStoreFactory;
+
+ public CatalogStoreResult(
+ CatalogStore catalogStore, @Nullable CatalogStoreFactory
catalogStoreFactory) {
+ this.catalogStore = catalogStore;
+ this.catalogStoreFactory = catalogStoreFactory;
+ }
+
+ public CatalogStore getCatalogStore() {
+ return catalogStore;
+ }
+
+ @Nullable
+ public CatalogStoreFactory getCatalogStoreFactory() {
+ return catalogStoreFactory;
+ }
+ }
+
+ /**
+ * Gets or creates a {@link CatalogStore}. If a catalog store is provided
in settings, it will
+ * be used directly. Otherwise, a new catalog store will be created using
the factory.
+ *
+ * @param providedCatalogStore the catalog store from settings, if present
+ * @param configuration the configuration
+ * @param classLoader the user classloader
+ * @return a result containing the catalog store and factory (factory is
null if store was
+ * provided)
+ */
+ public static CatalogStoreResult getOrCreateCatalogStore(
+ Optional<CatalogStore> providedCatalogStore,
+ Configuration configuration,
+ ClassLoader classLoader) {
+ if (providedCatalogStore.isPresent()) {
+ return new CatalogStoreResult(providedCatalogStore.get(), null);
+ } else {
+ CatalogStoreFactory catalogStoreFactory =
+ findAndCreateCatalogStoreFactory(configuration,
classLoader);
+ CatalogStoreFactory.Context catalogStoreFactoryContext =
+ buildCatalogStoreFactoryContext(configuration,
classLoader);
+ catalogStoreFactory.open(catalogStoreFactoryContext);
+ CatalogStore catalogStore =
catalogStoreFactory.createCatalogStore();
+ return new CatalogStoreResult(catalogStore, catalogStoreFactory);
+ }
+ }
+
+ /**
+ * Finds and creates a {@link CatalogStoreFactory} using the provided
{@link Configuration} and
+ * user classloader.
+ *
+ * <p>The configuration format should be as follows:
+ *
+ * <pre>{@code
+ * table.catalog-store.kind: {identifier}
+ * table.catalog-store.{identifier}.{param1}: xxx
+ * table.catalog-store.{identifier}.{param2}: xxx
+ * }</pre>
+ */
+ public static CatalogStoreFactory findAndCreateCatalogStoreFactory(
+ Configuration configuration, ClassLoader classLoader) {
+ String identifier =
configuration.get(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND);
+
+ CatalogStoreFactory catalogStoreFactory =
+ FactoryUtil.discoverFactory(classLoader,
CatalogStoreFactory.class, identifier);
+
+ return catalogStoreFactory;
+ }
+
+ /**
+ * Build a {@link CatalogStoreFactory.Context} for opening the {@link
CatalogStoreFactory}.
+ *
+ * <p>The configuration format should be as follows:
+ *
+ * <pre>{@code
+ * table.catalog-store.kind: {identifier}
+ * table.catalog-store.{identifier}.{param1}: xxx
+ * table.catalog-store.{identifier}.{param2}: xxx
+ * }</pre>
+ */
+ public static CatalogStoreFactory.Context buildCatalogStoreFactoryContext(
+ Configuration configuration, ClassLoader classLoader) {
+ String identifier =
configuration.get(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND);
+ String catalogStoreOptionPrefix =
+ CommonCatalogOptions.TABLE_CATALOG_STORE_OPTION_PREFIX +
identifier + ".";
+ Map<String, String> options =
+ new DelegatingConfiguration(configuration,
catalogStoreOptionPrefix).toMap();
+ CatalogStoreFactory.Context context =
+ new FactoryUtil.DefaultCatalogStoreContext(options,
configuration, classLoader);
+
+ return context;
+ }
+}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
index ad681b54394..09e3951fe40 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
@@ -19,14 +19,11 @@
package org.apache.flink.table.factories;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.DelegatingConfiguration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.listener.CatalogModificationListener;
@@ -42,10 +39,14 @@ import org.apache.flink.table.legacy.sources.TableSource;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.stream.Collectors;
-/** Utility for dealing with {@link TableFactory} using the {@link
TableFactoryService}. */
+/**
+ * Utility for dealing with {@link TableFactory} using the {@link
TableFactoryService}.
+ *
+ * @deprecated Use {@link FactoryUtil} instead.
+ */
+@Deprecated
@Internal
public class TableFactoryUtil {
@@ -174,50 +175,4 @@ public class TableFactoryUtil {
}))
.collect(Collectors.toList());
}
-
- /**
- * Finds and creates a {@link CatalogStoreFactory} using the provided
{@link Configuration} and
- * user classloader.
- *
- * <p>The configuration format should be as follows:
- *
- * <pre>{@code
- * table.catalog-store.kind: {identifier}
- * table.catalog-store.{identifier}.{param1}: xxx
- * table.catalog-store.{identifier}.{param2}: xxx
- * }</pre>
- */
- public static CatalogStoreFactory findAndCreateCatalogStoreFactory(
- Configuration configuration, ClassLoader classLoader) {
- String identifier =
configuration.get(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND);
-
- CatalogStoreFactory catalogStoreFactory =
- FactoryUtil.discoverFactory(classLoader,
CatalogStoreFactory.class, identifier);
-
- return catalogStoreFactory;
- }
-
- /**
- * Build a {@link CatalogStoreFactory.Context} for opening the {@link
CatalogStoreFactory}.
- *
- * <p>The configuration format should be as follows:
- *
- * <pre>{@code
- * table.catalog-store.kind: {identifier}
- * table.catalog-store.{identifier}.{param1}: xxx
- * table.catalog-store.{identifier}.{param2}: xxx
- * }</pre>
- */
- public static CatalogStoreFactory.Context buildCatalogStoreFactoryContext(
- Configuration configuration, ClassLoader classLoader) {
- String identifier =
configuration.get(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND);
- String catalogStoreOptionPrefix =
- CommonCatalogOptions.TABLE_CATALOG_STORE_OPTION_PREFIX +
identifier + ".";
- Map<String, String> options =
- new DelegatingConfiguration(configuration,
catalogStoreOptionPrefix).toMap();
- CatalogStoreFactory.Context context =
- new FactoryUtil.DefaultCatalogStoreContext(options,
configuration, classLoader);
-
- return context;
- }
}
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ApiFactoryUtilTest.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ApiFactoryUtilTest.java
new file mode 100644
index 00000000000..c8e1908291b
--- /dev/null
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ApiFactoryUtilTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.catalog.FileCatalogStoreFactory;
+import org.apache.flink.table.catalog.GenericInMemoryCatalogStoreFactory;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.File;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ApiFactoryUtil}. */
+class ApiFactoryUtilTest {
+
+ @ParameterizedTest(name = "kind={0}, expectedFactory={1}")
+ @MethodSource("catalogStoreFactoryTestParameters")
+ void testFindAndCreateCatalogStoreFactory(String kind, Class<?>
expectedFactoryClass) {
+ Configuration configuration = new Configuration();
+ if (kind != null) {
+ configuration.set(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND,
kind);
+ }
+ ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
+
+ CatalogStoreFactory factory =
+ ApiFactoryUtil.findAndCreateCatalogStoreFactory(configuration,
classLoader);
+
+ assertThat(factory).isInstanceOf(expectedFactoryClass);
+ }
+
+ @Test
+ void testBuildCatalogStoreFactoryContext(@TempDir File tempFolder) {
+ Configuration configuration = new Configuration();
+ configuration.set(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND,
"file");
+ configuration.setString("table.catalog-store.file.path",
tempFolder.getAbsolutePath());
+ configuration.setString("table.catalog-store.file.option1", "value1");
+ configuration.setString("table.catalog-store.file.option2", "value2");
+ ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
+
+ CatalogStoreFactory.Context context =
+ ApiFactoryUtil.buildCatalogStoreFactoryContext(configuration,
classLoader);
+
+ assertThat(context).isNotNull();
+ assertThat(context.getOptions())
+ .containsExactlyInAnyOrderEntriesOf(
+ Map.of(
+ "path", tempFolder.getAbsolutePath(),
+ "option1", "value1",
+ "option2", "value2"));
+ assertThat(context.getConfiguration()).isEqualTo(configuration);
+ assertThat(context.getClassLoader()).isEqualTo(classLoader);
+ }
+
+ @Test
+ void testBuildCatalogStoreFactoryContextWithGenericInMemory() {
+ Configuration configuration = new Configuration();
+ configuration.set(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND,
"generic_in_memory");
+
configuration.setString("table.catalog-store.generic_in_memory.option1",
"value1");
+ ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
+
+ CatalogStoreFactory.Context context =
+ ApiFactoryUtil.buildCatalogStoreFactoryContext(configuration,
classLoader);
+
+ assertThat(context).isNotNull();
+ assertThat(context.getOptions())
+ .containsExactlyInAnyOrderEntriesOf(Map.of("option1",
"value1"));
+ assertThat(context.getConfiguration()).isEqualTo(configuration);
+ assertThat(context.getClassLoader()).isEqualTo(classLoader);
+ }
+
+ @Test
+ void testBuildCatalogStoreFactoryContextWithoutOptions() {
+ Configuration configuration = new Configuration();
+ configuration.set(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND,
"generic_in_memory");
+ ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
+
+ CatalogStoreFactory.Context context =
+ ApiFactoryUtil.buildCatalogStoreFactoryContext(configuration,
classLoader);
+
+ assertThat(context).isNotNull();
+ assertThat(context.getOptions()).isEmpty();
+ assertThat(context.getConfiguration()).isEqualTo(configuration);
+ assertThat(context.getClassLoader()).isEqualTo(classLoader);
+ }
+
+ @Test
+ void testBuildCatalogStoreFactoryContextOnlyExtractsRelevantOptions() {
+ Configuration configuration = new Configuration();
+ configuration.set(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND,
"file");
+ configuration.setString("table.catalog-store.file.path", "/test/path");
+ configuration.setString("table.catalog-store.file.option1", "value1");
+ configuration.setString("table.catalog-store.other.irrelevant",
"should-not-appear");
+ configuration.setString("other.config.key", "should-not-appear");
+ ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
+
+ CatalogStoreFactory.Context context =
+ ApiFactoryUtil.buildCatalogStoreFactoryContext(configuration,
classLoader);
+
+ assertThat(context).isNotNull();
+ assertThat(context.getOptions())
+ .containsExactlyInAnyOrderEntriesOf(
+ Map.of("path", "/test/path", "option1", "value1"));
+ }
+
+ private static Stream<Arguments> catalogStoreFactoryTestParameters() {
+ return Stream.of(
+ Arguments.of("generic_in_memory",
GenericInMemoryCatalogStoreFactory.class),
+ Arguments.of("file", FileCatalogStoreFactory.class),
+ Arguments.of(null, GenericInMemoryCatalogStoreFactory.class));
+ }
+}
diff --git
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
index 7e86c5bf256..c8cc4e99463 100644
---
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
+++
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
@@ -28,7 +28,7 @@ import org.apache.flink.table.catalog._
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.table.delegation.{Executor, Planner}
import org.apache.flink.table.expressions.Expression
-import org.apache.flink.table.factories.{PlannerFactoryUtil, TableFactoryUtil}
+import org.apache.flink.table.factories.{ApiFactoryUtil, PlannerFactoryUtil,
TableFactoryUtil}
import org.apache.flink.table.functions.{AggregateFunction,
TableAggregateFunction, TableFunction, UserDefinedFunctionHelper}
import org.apache.flink.table.legacy.sources.TableSource
import org.apache.flink.table.module.ModuleManager
@@ -250,14 +250,19 @@ object StreamTableEnvironmentImpl {
val resourceManager = new ResourceManager(settings.getConfiguration,
userClassLoader)
val moduleManager = new ModuleManager
- val catalogStoreFactory =
-
TableFactoryUtil.findAndCreateCatalogStoreFactory(settings.getConfiguration,
userClassLoader)
- val catalogStoreFactoryContext =
-
TableFactoryUtil.buildCatalogStoreFactoryContext(settings.getConfiguration,
userClassLoader)
- catalogStoreFactory.open(catalogStoreFactoryContext)
- val catalogStore =
- if (settings.getCatalogStore != null) settings.getCatalogStore
- else catalogStoreFactory.createCatalogStore()
+ val (catalogStore, catalogStoreFactory) =
+ if (settings.getCatalogStore.isPresent) {
+ (settings.getCatalogStore.get(), null)
+ } else {
+ val factory =
+ ApiFactoryUtil.findAndCreateCatalogStoreFactory(
+ settings.getConfiguration,
+ userClassLoader)
+ val factoryContext =
+
ApiFactoryUtil.buildCatalogStoreFactoryContext(settings.getConfiguration,
userClassLoader)
+ factory.open(factoryContext)
+ (factory.createCatalogStore(), factory)
+ }
val catalogManager = CatalogManager.newBuilder
.classLoader(userClassLoader)