This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b1192e8a8e8 [FLINK-39029] move catalog store factory builder to new 
class (#27531)
b1192e8a8e8 is described below

commit b1192e8a8e8d2756388394acf42d58a38b419cd3
Author: Hao Li <[email protected]>
AuthorDate: Thu Feb 19 10:38:22 2026 -0800

    [FLINK-39029] move catalog store factory builder to new class (#27531)
    
    * [FLINK-39029] move catalog store factory builder to new class
---
 .gitignore                                         |   1 +
 .../gateway/service/context/SessionContext.java    |   5 +-
 .../java/internal/StreamTableEnvironmentImpl.java  |  17 +--
 .../flink/table/api/EnvironmentSettings.java       |   5 +-
 .../table/api/internal/TableEnvironmentImpl.java   |  17 +--
 .../flink/table/factories/ApiFactoryUtil.java      | 130 ++++++++++++++++++++
 .../flink/table/factories/TableFactoryUtil.java    |  57 +--------
 .../flink/table/factories/ApiFactoryUtilTest.java  | 136 +++++++++++++++++++++
 .../internal/StreamTableEnvironmentImpl.scala      |  23 ++--
 9 files changed, 304 insertions(+), 87 deletions(-)

diff --git a/.gitignore b/.gitignore
index 7f204c296f3..0ed273dea3d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -7,6 +7,7 @@ scalastyle-output.xml
 !.idea/vcs.xml
 !.idea/icon.png
 .vscode
+.claude
 .metals
 .bloop
 .cursor
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
index 12bfc2d8e5a..b5463d44267 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
@@ -31,6 +31,7 @@ import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.CatalogStoreHolder;
 import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.factories.ApiFactoryUtil;
 import org.apache.flink.table.factories.CatalogStoreFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.factories.TableFactoryUtil;
@@ -390,9 +391,9 @@ public class SessionContext {
             SessionEnvironment environment) {
 
         CatalogStoreFactory catalogStoreFactory =
-                
TableFactoryUtil.findAndCreateCatalogStoreFactory(configuration, 
userClassLoader);
+                ApiFactoryUtil.findAndCreateCatalogStoreFactory(configuration, 
userClassLoader);
         CatalogStoreFactory.Context catalogStoreFactoryContext =
-                
TableFactoryUtil.buildCatalogStoreFactoryContext(configuration, 
userClassLoader);
+                ApiFactoryUtil.buildCatalogStoreFactoryContext(configuration, 
userClassLoader);
         catalogStoreFactory.open(catalogStoreFactoryContext);
         CatalogStoreHolder catalogStore =
                 CatalogStoreHolder.newBuilder()
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
index 2bba1d76d69..810546ed6a6 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
@@ -46,6 +46,7 @@ import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.delegation.Executor;
 import org.apache.flink.table.delegation.Planner;
 import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.ApiFactoryUtil;
 import org.apache.flink.table.factories.CatalogStoreFactory;
 import org.apache.flink.table.factories.PlannerFactoryUtil;
 import org.apache.flink.table.factories.TableFactoryUtil;
@@ -112,17 +113,11 @@ public final class StreamTableEnvironmentImpl extends 
AbstractStreamTableEnviron
                 new ResourceManager(settings.getConfiguration(), 
userClassLoader);
         final ModuleManager moduleManager = new ModuleManager();
 
-        final CatalogStoreFactory catalogStoreFactory =
-                TableFactoryUtil.findAndCreateCatalogStoreFactory(
-                        settings.getConfiguration(), userClassLoader);
-        final CatalogStoreFactory.Context catalogStoreFactoryContext =
-                TableFactoryUtil.buildCatalogStoreFactoryContext(
-                        settings.getConfiguration(), userClassLoader);
-        catalogStoreFactory.open(catalogStoreFactoryContext);
-        final CatalogStore catalogStore =
-                settings.getCatalogStore() != null
-                        ? settings.getCatalogStore()
-                        : catalogStoreFactory.createCatalogStore();
+        final ApiFactoryUtil.CatalogStoreResult catalogStoreResult =
+                ApiFactoryUtil.getOrCreateCatalogStore(
+                        settings.getCatalogStore(), 
settings.getConfiguration(), userClassLoader);
+        final CatalogStore catalogStore = catalogStoreResult.getCatalogStore();
+        final CatalogStoreFactory catalogStoreFactory = 
catalogStoreResult.getCatalogStoreFactory();
 
         final CatalogManager catalogManager =
                 CatalogManager.newBuilder()
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
index 67fcfcec463..e08704a9b99 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
@@ -144,9 +144,8 @@ public class EnvironmentSettings {
     }
 
     @Internal
-    @Nullable
-    public CatalogStore getCatalogStore() {
-        return catalogStore;
+    public Optional<CatalogStore> getCatalogStore() {
+        return Optional.ofNullable(catalogStore);
     }
 
     @Internal
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 6a40fa9c238..388b4f969db 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -83,6 +83,7 @@ import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ModelReferenceExpression;
 import org.apache.flink.table.expressions.TableReferenceExpression;
 import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor;
+import org.apache.flink.table.factories.ApiFactoryUtil;
 import org.apache.flink.table.factories.CatalogStoreFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.factories.PlannerFactoryUtil;
@@ -251,17 +252,11 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
                         userClassLoader, ExecutorFactory.class, 
ExecutorFactory.DEFAULT_IDENTIFIER);
         final Executor executor = 
executorFactory.create(settings.getConfiguration());
 
-        final CatalogStoreFactory catalogStoreFactory =
-                TableFactoryUtil.findAndCreateCatalogStoreFactory(
-                        settings.getConfiguration(), userClassLoader);
-        final CatalogStoreFactory.Context context =
-                TableFactoryUtil.buildCatalogStoreFactoryContext(
-                        settings.getConfiguration(), userClassLoader);
-        catalogStoreFactory.open(context);
-        final CatalogStore catalogStore =
-                settings.getCatalogStore() != null
-                        ? settings.getCatalogStore()
-                        : catalogStoreFactory.createCatalogStore();
+        final ApiFactoryUtil.CatalogStoreResult catalogStoreResult =
+                ApiFactoryUtil.getOrCreateCatalogStore(
+                        settings.getCatalogStore(), 
settings.getConfiguration(), userClassLoader);
+        final CatalogStore catalogStore = catalogStoreResult.getCatalogStore();
+        final CatalogStoreFactory catalogStoreFactory = 
catalogStoreResult.getCatalogStoreFactory();
 
         // use configuration to init table config
         final TableConfig tableConfig = TableConfig.getDefault();
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ApiFactoryUtil.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ApiFactoryUtil.java
new file mode 100644
index 00000000000..cb3b9189d46
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ApiFactoryUtil.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.table.catalog.CatalogStore;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Utility for dealing with catalog store factories. */
+@Internal
+public class ApiFactoryUtil {
+
+    /** Result holder for catalog store and factory. */
+    @Internal
+    public static class CatalogStoreResult {
+        private final CatalogStore catalogStore;
+        @Nullable private final CatalogStoreFactory catalogStoreFactory;
+
+        public CatalogStoreResult(
+                CatalogStore catalogStore, @Nullable CatalogStoreFactory 
catalogStoreFactory) {
+            this.catalogStore = catalogStore;
+            this.catalogStoreFactory = catalogStoreFactory;
+        }
+
+        public CatalogStore getCatalogStore() {
+            return catalogStore;
+        }
+
+        @Nullable
+        public CatalogStoreFactory getCatalogStoreFactory() {
+            return catalogStoreFactory;
+        }
+    }
+
+    /**
+     * Gets or creates a {@link CatalogStore}. If a catalog store is provided 
in settings, it will
+     * be used directly. Otherwise, a new catalog store will be created using 
the factory.
+     *
+     * @param providedCatalogStore the catalog store from settings, if present
+     * @param configuration the configuration
+     * @param classLoader the user classloader
+     * @return a result containing the catalog store and factory (factory is 
null if store was
+     *     provided)
+     */
+    public static CatalogStoreResult getOrCreateCatalogStore(
+            Optional<CatalogStore> providedCatalogStore,
+            Configuration configuration,
+            ClassLoader classLoader) {
+        if (providedCatalogStore.isPresent()) {
+            return new CatalogStoreResult(providedCatalogStore.get(), null);
+        } else {
+            CatalogStoreFactory catalogStoreFactory =
+                    findAndCreateCatalogStoreFactory(configuration, 
classLoader);
+            CatalogStoreFactory.Context catalogStoreFactoryContext =
+                    buildCatalogStoreFactoryContext(configuration, 
classLoader);
+            catalogStoreFactory.open(catalogStoreFactoryContext);
+            CatalogStore catalogStore = 
catalogStoreFactory.createCatalogStore();
+            return new CatalogStoreResult(catalogStore, catalogStoreFactory);
+        }
+    }
+
+    /**
+     * Finds and creates a {@link CatalogStoreFactory} using the provided 
{@link Configuration} and
+     * user classloader.
+     *
+     * <p>The configuration format should be as follows:
+     *
+     * <pre>{@code
+     * table.catalog-store.kind: {identifier}
+     * table.catalog-store.{identifier}.{param1}: xxx
+     * table.catalog-store.{identifier}.{param2}: xxx
+     * }</pre>
+     */
+    public static CatalogStoreFactory findAndCreateCatalogStoreFactory(
+            Configuration configuration, ClassLoader classLoader) {
+        String identifier = 
configuration.get(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND);
+
+        CatalogStoreFactory catalogStoreFactory =
+                FactoryUtil.discoverFactory(classLoader, 
CatalogStoreFactory.class, identifier);
+
+        return catalogStoreFactory;
+    }
+
+    /**
+     * Build a {@link CatalogStoreFactory.Context} for opening the {@link 
CatalogStoreFactory}.
+     *
+     * <p>The configuration format should be as follows:
+     *
+     * <pre>{@code
+     * table.catalog-store.kind: {identifier}
+     * table.catalog-store.{identifier}.{param1}: xxx
+     * table.catalog-store.{identifier}.{param2}: xxx
+     * }</pre>
+     */
+    public static CatalogStoreFactory.Context buildCatalogStoreFactoryContext(
+            Configuration configuration, ClassLoader classLoader) {
+        String identifier = 
configuration.get(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND);
+        String catalogStoreOptionPrefix =
+                CommonCatalogOptions.TABLE_CATALOG_STORE_OPTION_PREFIX + 
identifier + ".";
+        Map<String, String> options =
+                new DelegatingConfiguration(configuration, 
catalogStoreOptionPrefix).toMap();
+        CatalogStoreFactory.Context context =
+                new FactoryUtil.DefaultCatalogStoreContext(options, 
configuration, classLoader);
+
+        return context;
+    }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
index ad681b54394..09e3951fe40 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
@@ -19,14 +19,11 @@
 package org.apache.flink.table.factories;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.DelegatingConfiguration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CommonCatalogOptions;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.listener.CatalogModificationListener;
@@ -42,10 +39,14 @@ import org.apache.flink.table.legacy.sources.TableSource;
 
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.stream.Collectors;
 
-/** Utility for dealing with {@link TableFactory} using the {@link 
TableFactoryService}. */
+/**
+ * Utility for dealing with {@link TableFactory} using the {@link 
TableFactoryService}.
+ *
+ * @deprecated Use {@link FactoryUtil} instead.
+ */
+@Deprecated
 @Internal
 public class TableFactoryUtil {
 
@@ -174,50 +175,4 @@ public class TableFactoryUtil {
                                                 }))
                 .collect(Collectors.toList());
     }
-
-    /**
-     * Finds and creates a {@link CatalogStoreFactory} using the provided 
{@link Configuration} and
-     * user classloader.
-     *
-     * <p>The configuration format should be as follows:
-     *
-     * <pre>{@code
-     * table.catalog-store.kind: {identifier}
-     * table.catalog-store.{identifier}.{param1}: xxx
-     * table.catalog-store.{identifier}.{param2}: xxx
-     * }</pre>
-     */
-    public static CatalogStoreFactory findAndCreateCatalogStoreFactory(
-            Configuration configuration, ClassLoader classLoader) {
-        String identifier = 
configuration.get(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND);
-
-        CatalogStoreFactory catalogStoreFactory =
-                FactoryUtil.discoverFactory(classLoader, 
CatalogStoreFactory.class, identifier);
-
-        return catalogStoreFactory;
-    }
-
-    /**
-     * Build a {@link CatalogStoreFactory.Context} for opening the {@link 
CatalogStoreFactory}.
-     *
-     * <p>The configuration format should be as follows:
-     *
-     * <pre>{@code
-     * table.catalog-store.kind: {identifier}
-     * table.catalog-store.{identifier}.{param1}: xxx
-     * table.catalog-store.{identifier}.{param2}: xxx
-     * }</pre>
-     */
-    public static CatalogStoreFactory.Context buildCatalogStoreFactoryContext(
-            Configuration configuration, ClassLoader classLoader) {
-        String identifier = 
configuration.get(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND);
-        String catalogStoreOptionPrefix =
-                CommonCatalogOptions.TABLE_CATALOG_STORE_OPTION_PREFIX + 
identifier + ".";
-        Map<String, String> options =
-                new DelegatingConfiguration(configuration, 
catalogStoreOptionPrefix).toMap();
-        CatalogStoreFactory.Context context =
-                new FactoryUtil.DefaultCatalogStoreContext(options, 
configuration, classLoader);
-
-        return context;
-    }
 }
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ApiFactoryUtilTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ApiFactoryUtilTest.java
new file mode 100644
index 00000000000..c8e1908291b
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ApiFactoryUtilTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.catalog.FileCatalogStoreFactory;
+import org.apache.flink.table.catalog.GenericInMemoryCatalogStoreFactory;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.File;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ApiFactoryUtil}. */
+class ApiFactoryUtilTest {
+
+    @ParameterizedTest(name = "kind={0}, expectedFactory={1}")
+    @MethodSource("catalogStoreFactoryTestParameters")
+    void testFindAndCreateCatalogStoreFactory(String kind, Class<?> 
expectedFactoryClass) {
+        Configuration configuration = new Configuration();
+        if (kind != null) {
+            configuration.set(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND, 
kind);
+        }
+        ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
+
+        CatalogStoreFactory factory =
+                ApiFactoryUtil.findAndCreateCatalogStoreFactory(configuration, 
classLoader);
+
+        assertThat(factory).isInstanceOf(expectedFactoryClass);
+    }
+
+    @Test
+    void testBuildCatalogStoreFactoryContext(@TempDir File tempFolder) {
+        Configuration configuration = new Configuration();
+        configuration.set(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND, 
"file");
+        configuration.setString("table.catalog-store.file.path", 
tempFolder.getAbsolutePath());
+        configuration.setString("table.catalog-store.file.option1", "value1");
+        configuration.setString("table.catalog-store.file.option2", "value2");
+        ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
+
+        CatalogStoreFactory.Context context =
+                ApiFactoryUtil.buildCatalogStoreFactoryContext(configuration, 
classLoader);
+
+        assertThat(context).isNotNull();
+        assertThat(context.getOptions())
+                .containsExactlyInAnyOrderEntriesOf(
+                        Map.of(
+                                "path", tempFolder.getAbsolutePath(),
+                                "option1", "value1",
+                                "option2", "value2"));
+        assertThat(context.getConfiguration()).isEqualTo(configuration);
+        assertThat(context.getClassLoader()).isEqualTo(classLoader);
+    }
+
+    @Test
+    void testBuildCatalogStoreFactoryContextWithGenericInMemory() {
+        Configuration configuration = new Configuration();
+        configuration.set(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND, 
"generic_in_memory");
+        
configuration.setString("table.catalog-store.generic_in_memory.option1", 
"value1");
+        ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
+
+        CatalogStoreFactory.Context context =
+                ApiFactoryUtil.buildCatalogStoreFactoryContext(configuration, 
classLoader);
+
+        assertThat(context).isNotNull();
+        assertThat(context.getOptions())
+                .containsExactlyInAnyOrderEntriesOf(Map.of("option1", 
"value1"));
+        assertThat(context.getConfiguration()).isEqualTo(configuration);
+        assertThat(context.getClassLoader()).isEqualTo(classLoader);
+    }
+
+    @Test
+    void testBuildCatalogStoreFactoryContextWithoutOptions() {
+        Configuration configuration = new Configuration();
+        configuration.set(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND, 
"generic_in_memory");
+        ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
+
+        CatalogStoreFactory.Context context =
+                ApiFactoryUtil.buildCatalogStoreFactoryContext(configuration, 
classLoader);
+
+        assertThat(context).isNotNull();
+        assertThat(context.getOptions()).isEmpty();
+        assertThat(context.getConfiguration()).isEqualTo(configuration);
+        assertThat(context.getClassLoader()).isEqualTo(classLoader);
+    }
+
+    @Test
+    void testBuildCatalogStoreFactoryContextOnlyExtractsRelevantOptions() {
+        Configuration configuration = new Configuration();
+        configuration.set(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND, 
"file");
+        configuration.setString("table.catalog-store.file.path", "/test/path");
+        configuration.setString("table.catalog-store.file.option1", "value1");
+        configuration.setString("table.catalog-store.other.irrelevant", 
"should-not-appear");
+        configuration.setString("other.config.key", "should-not-appear");
+        ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
+
+        CatalogStoreFactory.Context context =
+                ApiFactoryUtil.buildCatalogStoreFactoryContext(configuration, 
classLoader);
+
+        assertThat(context).isNotNull();
+        assertThat(context.getOptions())
+                .containsExactlyInAnyOrderEntriesOf(
+                        Map.of("path", "/test/path", "option1", "value1"));
+    }
+
+    private static Stream<Arguments> catalogStoreFactoryTestParameters() {
+        return Stream.of(
+                Arguments.of("generic_in_memory", 
GenericInMemoryCatalogStoreFactory.class),
+                Arguments.of("file", FileCatalogStoreFactory.class),
+                Arguments.of(null, GenericInMemoryCatalogStoreFactory.class));
+    }
+}
diff --git 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
index 7e86c5bf256..c8cc4e99463 100644
--- 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
+++ 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
@@ -28,7 +28,7 @@ import org.apache.flink.table.catalog._
 import org.apache.flink.table.connector.ChangelogMode
 import org.apache.flink.table.delegation.{Executor, Planner}
 import org.apache.flink.table.expressions.Expression
-import org.apache.flink.table.factories.{PlannerFactoryUtil, TableFactoryUtil}
+import org.apache.flink.table.factories.{ApiFactoryUtil, PlannerFactoryUtil, 
TableFactoryUtil}
 import org.apache.flink.table.functions.{AggregateFunction, 
TableAggregateFunction, TableFunction, UserDefinedFunctionHelper}
 import org.apache.flink.table.legacy.sources.TableSource
 import org.apache.flink.table.module.ModuleManager
@@ -250,14 +250,19 @@ object StreamTableEnvironmentImpl {
     val resourceManager = new ResourceManager(settings.getConfiguration, 
userClassLoader)
     val moduleManager = new ModuleManager
 
-    val catalogStoreFactory =
-      
TableFactoryUtil.findAndCreateCatalogStoreFactory(settings.getConfiguration, 
userClassLoader)
-    val catalogStoreFactoryContext =
-      
TableFactoryUtil.buildCatalogStoreFactoryContext(settings.getConfiguration, 
userClassLoader)
-    catalogStoreFactory.open(catalogStoreFactoryContext)
-    val catalogStore =
-      if (settings.getCatalogStore != null) settings.getCatalogStore
-      else catalogStoreFactory.createCatalogStore()
+    val (catalogStore, catalogStoreFactory) =
+      if (settings.getCatalogStore.isPresent) {
+        (settings.getCatalogStore.get(), null)
+      } else {
+        val factory =
+          ApiFactoryUtil.findAndCreateCatalogStoreFactory(
+            settings.getConfiguration,
+            userClassLoader)
+        val factoryContext =
+          
ApiFactoryUtil.buildCatalogStoreFactoryContext(settings.getConfiguration, 
userClassLoader)
+        factory.open(factoryContext)
+        (factory.createCatalogStore(), factory)
+      }
 
     val catalogManager = CatalogManager.newBuilder
       .classLoader(userClassLoader)

Reply via email to