This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3155daa9062 [FLINK-38263][table] Add SecretStore related interfaces
3155daa9062 is described below

commit 3155daa90629024176cc8773f45a9d99872899a6
Author: Hao Li <[email protected]>
AuthorDate: Tue Mar 3 08:50:52 2026 -0800

    [FLINK-38263][table] Add SecretStore related interfaces
    
    This closes #27394.
---
 .../test_environment_settings_completeness.py      |   4 +-
 .../flink/table/api/EnvironmentSettings.java       |  30 +++-
 .../table/api/internal/TableEnvironmentImpl.java   |   9 +
 .../flink/table/factories/ApiFactoryUtil.java      | 101 +++++++++++
 .../table/secret/GenericInMemorySecretStore.java   |  94 +++++++++++
 .../secret/GenericInMemorySecretStoreFactory.java  |  63 +++++++
 .../org.apache.flink.table.factories.Factory       |   1 +
 .../flink/table/factories/ApiFactoryUtilTest.java  |  77 +++++++++
 .../GenericInMemorySecretStoreFactoryTest.java     |  67 ++++++++
 .../secret/GenericInMemorySecretStoreTest.java     | 188 +++++++++++++++++++++
 .../apache/flink/table/factories/FactoryUtil.java  |  36 ++++
 .../flink/table/secret/CommonSecretOptions.java    |  41 +++++
 .../flink/table/secret/ReadableSecretStore.java    |  45 +++++
 .../org/apache/flink/table/secret/SecretStore.java |  33 ++++
 .../flink/table/secret/SecretStoreFactory.java     |  62 +++++++
 .../flink/table/secret/WritableSecretStore.java    |  64 +++++++
 .../table/secret/exceptions/SecretException.java   |  61 +++++++
 .../secret/exceptions/SecretNotFoundException.java |  53 ++++++
 18 files changed, 1025 insertions(+), 4 deletions(-)

diff --git 
a/flink-python/pyflink/table/tests/test_environment_settings_completeness.py 
b/flink-python/pyflink/table/tests/test_environment_settings_completeness.py
index 88da848c8ef..5ce757f2472 100644
--- a/flink-python/pyflink/table/tests/test_environment_settings_completeness.py
+++ b/flink-python/pyflink/table/tests/test_environment_settings_completeness.py
@@ -38,7 +38,7 @@ class 
EnvironmentSettingsCompletenessTests(PythonAPICompletenessTestCase, PyFlin
     def excluded_methods(cls):
         # internal interfaces, no need to expose to users.
         return {'getPlanner', 'getExecutor', 'getUserClassLoader', 
'getCatalogStore',
-                'toConfiguration', 'fromConfiguration', 'getSqlFactory'}
+                'getSecretStore', 'toConfiguration', 'fromConfiguration', 
'getSqlFactory'}
 
 
 class 
EnvironmentSettingsBuilderCompletenessTests(PythonAPICompletenessTestCase, 
PyFlinkTestCase):
@@ -59,7 +59,7 @@ class 
EnvironmentSettingsBuilderCompletenessTests(PythonAPICompletenessTestCase,
     def excluded_methods(cls):
         # internal interfaces, no need to expose to users.
         # withSqlFactory - needs to be implemented
-        return {'withClassLoader', 'withCatalogStore', 'withSqlFactory'}
+        return {'withClassLoader', 'withCatalogStore', 'withSecretStore', 
'withSqlFactory'}
 
 if __name__ == '__main__':
     import unittest
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
index e08704a9b99..92a375bf0bf 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.catalog.CatalogStore;
 import org.apache.flink.table.expressions.SqlFactory;
 import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.secret.SecretStore;
 
 import javax.annotation.Nullable;
 
@@ -66,16 +67,19 @@ public class EnvironmentSettings {
 
     private final @Nullable CatalogStore catalogStore;
     private final @Nullable SqlFactory sqlFactory;
+    private final @Nullable SecretStore secretStore;
 
     private EnvironmentSettings(
             Configuration configuration,
             ClassLoader classLoader,
             CatalogStore catalogStore,
-            SqlFactory sqlFactory) {
+            SqlFactory sqlFactory,
+            SecretStore secretStore) {
         this.configuration = configuration;
         this.classLoader = classLoader;
         this.catalogStore = catalogStore;
         this.sqlFactory = sqlFactory;
+        this.secretStore = secretStore;
     }
 
     /**
@@ -153,6 +157,11 @@ public class EnvironmentSettings {
         return Optional.ofNullable(sqlFactory);
     }
 
+    @Internal
+    public Optional<SecretStore> getSecretStore() {
+        return Optional.ofNullable(secretStore);
+    }
+
     /** A builder for {@link EnvironmentSettings}. */
     @PublicEvolving
     public static class Builder {
@@ -162,6 +171,7 @@ public class EnvironmentSettings {
 
         private @Nullable CatalogStore catalogStore;
         private @Nullable SqlFactory sqlFactory;
+        private @Nullable SecretStore secretStore;
 
         public Builder() {}
 
@@ -250,12 +260,28 @@ public class EnvironmentSettings {
             return this;
         }
 
+        /**
+         * Specifies the {@link SecretStore} to be used for managing secrets 
in the {@link
+         * TableEnvironment}.
+         *
+         * <p>The secret store allows for secure storage and retrieval of 
sensitive configuration
+         * data such as credentials, tokens, and passwords.
+         *
+         * @param secretStore the secret store instance to use
+         * @return this builder
+         */
+        public Builder withSecretStore(SecretStore secretStore) {
+            this.secretStore = secretStore;
+            return this;
+        }
+
         /** Returns an immutable instance of {@link EnvironmentSettings}. */
         public EnvironmentSettings build() {
             if (classLoader == null) {
                 classLoader = Thread.currentThread().getContextClassLoader();
             }
-            return new EnvironmentSettings(configuration, classLoader, 
catalogStore, sqlFactory);
+            return new EnvironmentSettings(
+                    configuration, classLoader, catalogStore, sqlFactory, 
secretStore);
         }
     }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 388b4f969db..4743663e8a4 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -120,6 +120,8 @@ import 
org.apache.flink.table.operations.utils.OperationTreeBuilder;
 import org.apache.flink.table.resource.ResourceManager;
 import org.apache.flink.table.resource.ResourceType;
 import org.apache.flink.table.resource.ResourceUri;
+import org.apache.flink.table.secret.SecretStore;
+import org.apache.flink.table.secret.SecretStoreFactory;
 import org.apache.flink.table.types.AbstractDataType;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.utils.DataTypeUtils;
@@ -258,6 +260,13 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
         final CatalogStore catalogStore = catalogStoreResult.getCatalogStore();
         final CatalogStoreFactory catalogStoreFactory = 
catalogStoreResult.getCatalogStoreFactory();
 
+        final ApiFactoryUtil.SecretStoreResult secretStoreResult =
+                ApiFactoryUtil.getOrCreateSecretStore(
+                        settings.getSecretStore(), 
settings.getConfiguration(), userClassLoader);
+        final SecretStore secretStore = secretStoreResult.getSecretStore();
+        final SecretStoreFactory secretStoreFactory = 
secretStoreResult.getSecretStoreFactory();
+        // TODO (FLINK-38261): pass secret store to catalog manager for 
encryption/decryption
+
         // use configuration to init table config
         final TableConfig tableConfig = TableConfig.getDefault();
         tableConfig.setRootConfiguration(executor.getConfiguration());
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ApiFactoryUtil.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ApiFactoryUtil.java
index cb3b9189d46..5062db03791 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ApiFactoryUtil.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ApiFactoryUtil.java
@@ -19,10 +19,14 @@
 package org.apache.flink.table.factories;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DelegatingConfiguration;
 import org.apache.flink.table.catalog.CatalogStore;
 import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.secret.CommonSecretOptions;
+import org.apache.flink.table.secret.SecretStore;
+import org.apache.flink.table.secret.SecretStoreFactory;
 
 import javax.annotation.Nullable;
 
@@ -127,4 +131,101 @@ public class ApiFactoryUtil {
 
         return context;
     }
+
+    /** Result holder for secret store and factory. */
+    @Internal
+    public static class SecretStoreResult {
+        private final SecretStore secretStore;
+        @Nullable private final SecretStoreFactory secretStoreFactory;
+
+        public SecretStoreResult(
+                SecretStore secretStore, @Nullable SecretStoreFactory 
secretStoreFactory) {
+            this.secretStore = secretStore;
+            this.secretStoreFactory = secretStoreFactory;
+        }
+
+        public SecretStore getSecretStore() {
+            return secretStore;
+        }
+
+        @Nullable
+        public SecretStoreFactory getSecretStoreFactory() {
+            return secretStoreFactory;
+        }
+    }
+
+    /**
+     * Gets or creates a {@link SecretStore}. If a secret store is provided in 
settings, it will be
+     * used directly. Otherwise, a new secret store will be created using the 
factory.
+     *
+     * @param providedSecretStore the secret store from settings, if present
+     * @param configuration the configuration
+     * @param classLoader the user classloader
+     * @return a result containing the secret store and factory (factory is 
null if store was
+     *     provided)
+     */
+    public static SecretStoreResult getOrCreateSecretStore(
+            Optional<SecretStore> providedSecretStore,
+            Configuration configuration,
+            ClassLoader classLoader) {
+        if (providedSecretStore.isPresent()) {
+            return new SecretStoreResult(providedSecretStore.get(), null);
+        } else {
+            SecretStoreFactory secretStoreFactory =
+                    findAndCreateSecretStoreFactory(configuration, 
classLoader);
+            SecretStoreFactory.Context secretStoreFactoryContext =
+                    buildSecretStoreFactoryContext(configuration, classLoader);
+            secretStoreFactory.open(secretStoreFactoryContext);
+            SecretStore secretStore = secretStoreFactory.createSecretStore();
+            return new SecretStoreResult(secretStore, secretStoreFactory);
+        }
+    }
+
+    /**
+     * Finds and creates a {@link SecretStoreFactory} using the provided 
{@link Configuration} and
+     * user classloader.
+     *
+     * <p>The configuration format should be as follows:
+     *
+     * <pre>{@code
+     * table.secret-store.kind: {identifier}
+     * table.secret-store.{identifier}.{param1}: xxx
+     * table.secret-store.{identifier}.{param2}: xxx
+     * }</pre>
+     */
+    @VisibleForTesting
+    static SecretStoreFactory findAndCreateSecretStoreFactory(
+            Configuration configuration, ClassLoader classLoader) {
+        String identifier = 
configuration.get(CommonSecretOptions.TABLE_SECRET_STORE_KIND);
+
+        SecretStoreFactory secretStoreFactory =
+                FactoryUtil.discoverFactory(classLoader, 
SecretStoreFactory.class, identifier);
+
+        return secretStoreFactory;
+    }
+
+    /**
+     * Build a {@link SecretStoreFactory.Context} for opening the {@link 
SecretStoreFactory}.
+     *
+     * <p>The configuration format should be as follows:
+     *
+     * <pre>{@code
+     * table.secret-store.kind: {identifier}
+     * table.secret-store.{identifier}.{param1}: xxx
+     * table.secret-store.{identifier}.{param2}: xxx
+     * }</pre>
+     */
+    @VisibleForTesting
+    static SecretStoreFactory.Context buildSecretStoreFactoryContext(
+            Configuration configuration, ClassLoader classLoader) {
+        String identifier = 
configuration.get(CommonSecretOptions.TABLE_SECRET_STORE_KIND);
+        String secretStoreOptionPrefix =
+                CommonSecretOptions.TABLE_SECRET_STORE_OPTION_PREFIX + 
identifier + ".";
+        Map<String, String> options =
+                new DelegatingConfiguration(configuration, 
secretStoreOptionPrefix).toMap();
+        SecretStoreFactory.Context context =
+                new FactoryUtil.DefaultSecretStoreContext(options, 
configuration, classLoader);
+
+        return context;
+    }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStore.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStore.java
new file mode 100644
index 00000000000..7554bb789e6
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStore.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.secret;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.secret.exceptions.SecretNotFoundException;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A generic in-memory implementation of both {@link ReadableSecretStore} and 
{@link
+ * WritableSecretStore}.
+ *
+ * <p>This implementation stores secrets in memory as immutable Map objects. 
It is suitable for
+ * testing and development purposes but should not be used in production 
environments as secrets are
+ * not encrypted.
+ */
+@Internal
+public class GenericInMemorySecretStore implements ReadableSecretStore, 
WritableSecretStore {
+
+    private final Map<String, Map<String, String>> secrets;
+
+    public GenericInMemorySecretStore() {
+        this.secrets = new HashMap<>();
+    }
+
+    @Override
+    public Map<String, String> getSecret(String secretId) throws 
SecretNotFoundException {
+        checkNotNull(secretId, "Secret ID cannot be null");
+
+        Map<String, String> secretData = secrets.get(secretId);
+        if (secretData == null) {
+            throw new SecretNotFoundException(
+                    String.format("Secret with ID '%s' not found", secretId));
+        }
+
+        return secretData;
+    }
+
+    @Override
+    public String storeSecret(Map<String, String> secretData) {
+        checkNotNull(secretData, "Secret data cannot be null");
+
+        String secretId = UUID.randomUUID().toString();
+        secrets.put(secretId, Collections.unmodifiableMap(new 
HashMap<>(secretData)));
+        return secretId;
+    }
+
+    @Override
+    public void removeSecret(String secretId) {
+        checkNotNull(secretId, "Secret ID cannot be null");
+        secrets.remove(secretId);
+    }
+
+    @Override
+    public void updateSecret(String secretId, Map<String, String> 
newSecretData)
+            throws SecretNotFoundException {
+        checkNotNull(secretId, "Secret ID cannot be null");
+        checkNotNull(newSecretData, "New secret data cannot be null");
+
+        if (!secrets.containsKey(secretId)) {
+            throw new SecretNotFoundException(
+                    String.format("Secret with ID '%s' not found", secretId));
+        }
+
+        secrets.put(secretId, Collections.unmodifiableMap(new 
HashMap<>(newSecretData)));
+    }
+
+    /** Clears all secrets from the store (for testing purposes). */
+    void clear() {
+        secrets.clear();
+    }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStoreFactory.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStoreFactory.java
new file mode 100644
index 00000000000..bbccfd4ec2f
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStoreFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.secret;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+
+import java.util.Set;
+
+/**
+ * Factory for creating {@link GenericInMemorySecretStore} instances.
+ *
+ * <p>This factory creates in-memory secret stores that are suitable for 
testing and development
+ * purposes. Secrets are stored in plaintext JSON format in memory and are not 
persisted.
+ */
+@Internal
+public class GenericInMemorySecretStoreFactory implements SecretStoreFactory {
+
+    public static final String IDENTIFIER = "generic_in_memory";
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        return Set.of();
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        return Set.of();
+    }
+
+    @Override
+    public SecretStore createSecretStore() {
+        return new GenericInMemorySecretStore();
+    }
+
+    @Override
+    public void open(Context context) {}
+
+    @Override
+    public void close() throws CatalogException {}
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/flink-table/flink-table-api-java/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index d86ead55ad6..8191a71a678 100644
--- 
a/flink-table/flink-table-api-java/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ 
b/flink-table/flink-table-api-java/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -16,3 +16,4 @@
 org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
 org.apache.flink.table.catalog.GenericInMemoryCatalogStoreFactory
 org.apache.flink.table.catalog.FileCatalogStoreFactory
+org.apache.flink.table.secret.GenericInMemorySecretStoreFactory
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ApiFactoryUtilTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ApiFactoryUtilTest.java
index c8e1908291b..249fd16bbc8 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ApiFactoryUtilTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ApiFactoryUtilTest.java
@@ -22,6 +22,9 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.catalog.CommonCatalogOptions;
 import org.apache.flink.table.catalog.FileCatalogStoreFactory;
 import org.apache.flink.table.catalog.GenericInMemoryCatalogStoreFactory;
+import org.apache.flink.table.secret.CommonSecretOptions;
+import org.apache.flink.table.secret.GenericInMemorySecretStoreFactory;
+import org.apache.flink.table.secret.SecretStoreFactory;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -133,4 +136,78 @@ class ApiFactoryUtilTest {
                 Arguments.of("file", FileCatalogStoreFactory.class),
                 Arguments.of(null, GenericInMemoryCatalogStoreFactory.class));
     }
+
+    @ParameterizedTest(name = "kind={0}, expectedFactory={1}")
+    @MethodSource("secretStoreFactoryTestParameters")
+    void testFindAndCreateSecretStoreFactory(String kind, Class<?> 
expectedFactoryClass) {
+        Configuration configuration = new Configuration();
+        if (kind != null) {
+            configuration.set(CommonSecretOptions.TABLE_SECRET_STORE_KIND, 
kind);
+        }
+        ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
+
+        SecretStoreFactory factory =
+                ApiFactoryUtil.findAndCreateSecretStoreFactory(configuration, 
classLoader);
+
+        assertThat(factory).isInstanceOf(expectedFactoryClass);
+    }
+
+    @Test
+    void testBuildSecretStoreFactoryContext() {
+        Configuration configuration = new Configuration();
+        configuration.set(CommonSecretOptions.TABLE_SECRET_STORE_KIND, 
"generic_in_memory");
+        
configuration.setString("table.secret-store.generic_in_memory.option1", 
"value1");
+        
configuration.setString("table.secret-store.generic_in_memory.option2", 
"value2");
+        ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
+
+        SecretStoreFactory.Context context =
+                ApiFactoryUtil.buildSecretStoreFactoryContext(configuration, 
classLoader);
+
+        assertThat(context).isNotNull();
+        assertThat(context.getOptions())
+                .containsExactlyInAnyOrderEntriesOf(
+                        Map.of("option1", "value1", "option2", "value2"));
+        assertThat(context.getConfiguration()).isEqualTo(configuration);
+        assertThat(context.getClassLoader()).isEqualTo(classLoader);
+    }
+
+    @Test
+    void testBuildSecretStoreFactoryContextWithoutOptions() {
+        Configuration configuration = new Configuration();
+        configuration.set(CommonSecretOptions.TABLE_SECRET_STORE_KIND, 
"generic_in_memory");
+        ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
+
+        SecretStoreFactory.Context context =
+                ApiFactoryUtil.buildSecretStoreFactoryContext(configuration, 
classLoader);
+
+        assertThat(context).isNotNull();
+        assertThat(context.getOptions()).isEmpty();
+        assertThat(context.getConfiguration()).isEqualTo(configuration);
+        assertThat(context.getClassLoader()).isEqualTo(classLoader);
+    }
+
+    @Test
+    void testBuildSecretStoreFactoryContextOnlyExtractsRelevantOptions() {
+        Configuration configuration = new Configuration();
+        configuration.set(CommonSecretOptions.TABLE_SECRET_STORE_KIND, 
"generic_in_memory");
+        
configuration.setString("table.secret-store.generic_in_memory.option1", 
"value1");
+        
configuration.setString("table.secret-store.generic_in_memory.option2", 
"value2");
+        configuration.setString("table.secret-store.other.irrelevant", 
"should-not-appear");
+        configuration.setString("other.config.key", "should-not-appear");
+        ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
+
+        SecretStoreFactory.Context context =
+                ApiFactoryUtil.buildSecretStoreFactoryContext(configuration, 
classLoader);
+
+        assertThat(context).isNotNull();
+        assertThat(context.getOptions())
+                .containsExactlyInAnyOrderEntriesOf(
+                        Map.of("option1", "value1", "option2", "value2"));
+    }
+
+    private static Stream<Arguments> secretStoreFactoryTestParameters() {
+        return Stream.of(
+                Arguments.of("generic_in_memory", 
GenericInMemorySecretStoreFactory.class),
+                Arguments.of(null, GenericInMemorySecretStoreFactory.class));
+    }
 }
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/secret/GenericInMemorySecretStoreFactoryTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/secret/GenericInMemorySecretStoreFactoryTest.java
new file mode 100644
index 00000000000..a9351143306
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/secret/GenericInMemorySecretStoreFactoryTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.secret;
+
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.FactoryUtil.DefaultSecretStoreContext;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link GenericInMemorySecretStoreFactory}. */
+class GenericInMemorySecretStoreFactoryTest {
+
+    @Test
+    void testSecretStoreInit() {
+        String factoryIdentifier = 
GenericInMemorySecretStoreFactory.IDENTIFIER;
+        Map<String, String> options = new HashMap<>();
+        ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
+        final DefaultSecretStoreContext discoveryContext =
+                new DefaultSecretStoreContext(options, null, classLoader);
+        final SecretStoreFactory factory =
+                FactoryUtil.discoverFactory(
+                        classLoader, SecretStoreFactory.class, 
factoryIdentifier);
+        try {
+            factory.open(discoveryContext);
+            SecretStore secretStore = factory.createSecretStore();
+            assertThat(secretStore instanceof 
GenericInMemorySecretStore).isTrue();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        } finally {
+            factory.close();
+        }
+    }
+
+    @Test
+    void testFactoryIdentifier() {
+        GenericInMemorySecretStoreFactory factory = new 
GenericInMemorySecretStoreFactory();
+        assertThat(factory.factoryIdentifier()).isEqualTo("generic_in_memory");
+    }
+
+    @Test
+    void testRequiredAndOptionalOptions() {
+        GenericInMemorySecretStoreFactory factory = new 
GenericInMemorySecretStoreFactory();
+        assertThat(factory.requiredOptions()).isEmpty();
+        assertThat(factory.optionalOptions()).isEmpty();
+    }
+}
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/secret/GenericInMemorySecretStoreTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/secret/GenericInMemorySecretStoreTest.java
new file mode 100644
index 00000000000..53674e1e5c0
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/secret/GenericInMemorySecretStoreTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.secret;
+
+import org.apache.flink.table.secret.exceptions.SecretNotFoundException;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+
+/** Test for {@link GenericInMemorySecretStore}. */
+class GenericInMemorySecretStoreTest {
+
+    private GenericInMemorySecretStore secretStore;
+
+    @BeforeEach
+    void setUp() {
+        secretStore = new GenericInMemorySecretStore();
+    }
+
+    @Test
+    void testStoreAndGetSecret() throws SecretNotFoundException {
+        Map<String, String> secretData = Map.of("username", "testuser", 
"password", "testpass");
+
+        String secretId = secretStore.storeSecret(secretData);
+        assertThat(secretId).isNotNull();
+
+        Map<String, String> retrievedSecret = secretStore.getSecret(secretId);
+        assertThat(retrievedSecret).isNotNull();
+        assertThat(retrievedSecret.get("username")).isEqualTo("testuser");
+        assertThat(retrievedSecret.get("password")).isEqualTo("testpass");
+    }
+
+    @Test
+    void testGetNonExistentSecret() {
+        assertThatThrownBy(() -> secretStore.getSecret("non-existent-id"))
+                .isInstanceOf(SecretNotFoundException.class)
+                .hasMessageContaining("Secret with ID 'non-existent-id' not 
found");
+    }
+
+    @Test
+    void testGetSecretWithNullId() {
+        assertThatThrownBy(() -> secretStore.getSecret(null))
+                .isInstanceOf(NullPointerException.class)
+                .hasMessageContaining("Secret ID cannot be null");
+    }
+
+    @Test
+    void testStoreSecretWithNullData() {
+        assertThatThrownBy(() -> secretStore.storeSecret(null))
+                .isInstanceOf(NullPointerException.class)
+                .hasMessageContaining("Secret data cannot be null");
+    }
+
+    @Test
+    void testRemoveSecret() throws SecretNotFoundException {
+        Map<String, String> secretData = Map.of("key", "value");
+
+        String secretId = secretStore.storeSecret(secretData);
+        assertThat(secretStore.getSecret(secretId)).isNotNull();
+
+        secretStore.removeSecret(secretId);
+        assertThatThrownBy(() -> secretStore.getSecret(secretId))
+                .isInstanceOf(SecretNotFoundException.class)
+                .hasMessageContaining("Secret with ID '" + secretId + "' not 
found");
+    }
+
+    @Test
+    void testRemoveSecretWithNullId() {
+        assertThatThrownBy(() -> secretStore.removeSecret(null))
+                .isInstanceOf(NullPointerException.class)
+                .hasMessageContaining("Secret ID cannot be null");
+    }
+
+    @Test
+    void testRemoveNonExistentSecret() {
+        // Should not throw exception, just silently remove nothing
+        assertDoesNotThrow(() -> secretStore.removeSecret("non-existent-id"));
+    }
+
+    @Test
+    void testUpdateSecret() throws SecretNotFoundException {
+        Map<String, String> originalData = Map.of("username", "olduser", 
"password", "oldpass");
+
+        String secretId = secretStore.storeSecret(originalData);
+
+        Map<String, String> updatedData = Map.of("username", "newuser", 
"password", "newpass");
+
+        secretStore.updateSecret(secretId, updatedData);
+
+        Map<String, String> retrievedSecret = secretStore.getSecret(secretId);
+        assertThat(retrievedSecret.get("username")).isEqualTo("newuser");
+        assertThat(retrievedSecret.get("password")).isEqualTo("newpass");
+    }
+
+    @Test
+    void testUpdateNonExistentSecret() {
+        Map<String, String> secretData = Map.of("key", "value");
+
+        assertThatThrownBy(() -> secretStore.updateSecret("non-existent-id", 
secretData))
+                .isInstanceOf(SecretNotFoundException.class)
+                .hasMessageContaining("Secret with ID 'non-existent-id' not 
found");
+    }
+
+    @Test
+    void testUpdateSecretWithNullId() {
+        Map<String, String> secretData = Map.of("key", "value");
+
+        assertThatThrownBy(() -> secretStore.updateSecret(null, secretData))
+                .isInstanceOf(NullPointerException.class)
+                .hasMessageContaining("Secret ID cannot be null");
+    }
+
+    @Test
+    void testUpdateSecretWithNullData() {
+        Map<String, String> originalData = Map.of("key", "value");
+        String secretId = secretStore.storeSecret(originalData);
+
+        assertThatThrownBy(() -> secretStore.updateSecret(secretId, null))
+                .isInstanceOf(NullPointerException.class)
+                .hasMessageContaining("New secret data cannot be null");
+    }
+
+    @Test
+    void testClear() {
+        Map<String, String> secretData1 = Map.of("key1", "value1");
+        String secretId1 = secretStore.storeSecret(secretData1);
+
+        Map<String, String> secretData2 = Map.of("key2", "value2");
+        String secretId2 = secretStore.storeSecret(secretData2);
+
+        secretStore.clear();
+
+        assertThatThrownBy(() -> secretStore.getSecret(secretId1))
+                .isInstanceOf(SecretNotFoundException.class);
+        assertThatThrownBy(() -> secretStore.getSecret(secretId2))
+                .isInstanceOf(SecretNotFoundException.class);
+    }
+
+    @Test
+    void testStoreEmptySecret() throws SecretNotFoundException {
+        Map<String, String> emptyData = Map.of();
+        String secretId = secretStore.storeSecret(emptyData);
+
+        Map<String, String> retrievedSecret = secretStore.getSecret(secretId);
+        assertThat(retrievedSecret).isNotNull();
+        assertThat(retrievedSecret).isEmpty();
+    }
+
+    @Test
+    void testStoreMultipleSecrets() throws SecretNotFoundException {
+        Map<String, String> secret1 = Map.of("user1", "pass1");
+
+        Map<String, String> secret2 = Map.of("user2", "pass2");
+
+        String secretId1 = secretStore.storeSecret(secret1);
+        String secretId2 = secretStore.storeSecret(secret2);
+
+        assertThat(secretId1).isNotEqualTo(secretId2);
+
+        Map<String, String> retrieved1 = secretStore.getSecret(secretId1);
+        Map<String, String> retrieved2 = secretStore.getSecret(secretId2);
+
+        assertThat(retrieved1.get("user1")).isEqualTo("pass1");
+        assertThat(retrieved2.get("user2")).isEqualTo("pass2");
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
index 691397c08d4..594a7bf1d5d 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
@@ -42,6 +42,7 @@ import 
org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.legacy.factories.TableFactory;
 import org.apache.flink.table.ml.ModelProvider;
 import org.apache.flink.table.module.Module;
+import org.apache.flink.table.secret.SecretStoreFactory;
 import org.apache.flink.table.utils.EncodingUtils;
 import org.apache.flink.table.watermark.WatermarkEmitStrategy;
 import org.apache.flink.util.Preconditions;
@@ -1411,6 +1412,41 @@ public final class FactoryUtil {
         }
     }
 
+    /** Default implementation of {@link SecretStoreFactory.Context}. */
+    @Internal
+    public static class DefaultSecretStoreContext implements 
SecretStoreFactory.Context {
+
+        private Map<String, String> options;
+
+        private ReadableConfig configuration;
+
+        private ClassLoader classLoader;
+
+        public DefaultSecretStoreContext(
+                Map<String, String> options,
+                ReadableConfig configuration,
+                ClassLoader classLoader) {
+            this.options = options;
+            this.configuration = configuration;
+            this.classLoader = classLoader;
+        }
+
+        @Override
+        public Map<String, String> getOptions() {
+            return options;
+        }
+
+        @Override
+        public ReadableConfig getConfiguration() {
+            return configuration;
+        }
+
+        @Override
+        public ClassLoader getClassLoader() {
+            return classLoader;
+        }
+    }
+
     /** Default implementation of {@link ModuleFactory.Context}. */
     @Internal
     public static class DefaultModuleContext implements ModuleFactory.Context {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/CommonSecretOptions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/CommonSecretOptions.java
new file mode 100644
index 00000000000..461909a6cd1
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/CommonSecretOptions.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.secret;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/** A collection of {@link ConfigOption} which are used for secret store 
configuration. */
+@Internal
+public class CommonSecretOptions {
+
+    public static final String DEFAULT_SECRET_STORE_KIND = "generic_in_memory";
+    public static final ConfigOption<String> TABLE_SECRET_STORE_KIND =
+            ConfigOptions.key("table.secret-store.kind")
+                    .stringType()
+                    .defaultValue(DEFAULT_SECRET_STORE_KIND)
+                    .withDescription(
+                            "The kind of secret store to be used. Out of the 
box, 'generic_in_memory' option is supported. "
+                                    + "Implementations can provide custom 
secret stores for different backends "
+                                    + "(e.g., cloud-specific secret 
managers).");
+
+    /** Used to filter the specific options for secret store. */
+    public static final String TABLE_SECRET_STORE_OPTION_PREFIX = 
"table.secret-store.";
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/ReadableSecretStore.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/ReadableSecretStore.java
new file mode 100644
index 00000000000..70dd48e21f9
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/ReadableSecretStore.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.secret;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.secret.exceptions.SecretNotFoundException;
+
+import java.util.Map;
+
+/**
+ * Interface for retrieving secrets from a secret store.
+ *
+ * <p>This interface enables read-only access to stored secrets, allowing 
applications to retrieve
+ * sensitive configuration data such as credentials, API tokens, and passwords.
+ *
+ * <p>Implementations of this interface should ensure secure retrieval and 
handling of secret data.
+ */
+@PublicEvolving
+public interface ReadableSecretStore extends SecretStore {
+
+    /**
+     * Retrieves a secret from the store by its identifier.
+     *
+     * @param secretId the unique identifier of the secret to retrieve
+     * @return a map containing the secret data as key-value pairs
+     * @throws SecretNotFoundException if the secret with the given identifier 
does not exist
+     */
+    Map<String, String> getSecret(String secretId) throws 
SecretNotFoundException;
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/SecretStore.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/SecretStore.java
new file mode 100644
index 00000000000..89af1d469e1
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/SecretStore.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.secret;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Base marker interface for secret store implementations.
+ *
+ * <p>This interface serves as the common base for both {@link 
ReadableSecretStore} and {@link
+ * WritableSecretStore}, allowing for flexible secret management 
implementations.
+ *
+ * <p>Secret stores are used to manage sensitive configuration data 
(credentials, tokens, passwords,
+ * etc.) in Flink SQL and Table API applications.
+ */
+@PublicEvolving
+public interface SecretStore {}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/SecretStoreFactory.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/SecretStoreFactory.java
new file mode 100644
index 00000000000..1fd40b8e78b
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/SecretStoreFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.secret;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.secret.exceptions.SecretException;
+
+import java.util.Map;
+
+/** Factory for creating SecretStore instances. */
+@PublicEvolving
+public interface SecretStoreFactory extends Factory {
+
+    /** Creates a SecretStore instance. */
+    SecretStore createSecretStore();
+
+    /** Initialize secret store. */
+    void open(Context context) throws SecretException;
+
+    /** Close secret store. */
+    void close() throws CatalogException;
+
+    /** Context for creating a SecretStore. */
+    @PublicEvolving
+    interface Context {
+        /**
+         * Returns the options with which the secret store is created.
+         *
+         * <p>An implementation should perform validation of these options.
+         */
+        Map<String, String> getOptions();
+
+        /** Gives read-only access to the configuration of the current 
session. */
+        ReadableConfig getConfiguration();
+
+        /**
+         * Returns the class loader of the current session.
+         *
+         * <p>The class loader is in particular useful for discovering further 
(nested) factories.
+         */
+        ClassLoader getClassLoader();
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/WritableSecretStore.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/WritableSecretStore.java
new file mode 100644
index 00000000000..db5037b7b2f
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/WritableSecretStore.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.secret;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.secret.exceptions.SecretNotFoundException;
+
+import java.util.Map;
+
+/**
+ * Interface for storing, updating, and removing secrets in a secret store.
+ *
+ * <p>This interface provides write operations for managing secrets, including 
adding new secrets,
+ * updating existing ones, and removing secrets that are no longer needed.
+ *
+ * <p>Implementations should ensure that secret operations are performed 
securely and, where
+ * applicable, atomically.
+ */
+@PublicEvolving
+public interface WritableSecretStore extends SecretStore {
+
+    /**
+     * Stores a new secret in the secret store.
+     *
+     * @param secretData a map containing the secret data as key-value pairs 
to be stored
+     * @return a unique identifier for the stored secret
+     */
+    String storeSecret(Map<String, String> secretData);
+
+    /**
+     * Removes a secret from the secret store.
+     *
+     * @param secretId the unique identifier of the secret to remove
+     */
+    void removeSecret(String secretId);
+
+    /**
+     * Atomically updates an existing secret with new data.
+     *
+     * <p>This operation replaces the entire secret data with the provided new 
data.
+     *
+     * @param secretId the unique identifier of the secret to update
+     * @param newSecretData a map containing the new secret data as key-value 
pairs
+     * @throws SecretNotFoundException if the secret with the given identifier 
does not exist
+     */
+    void updateSecret(String secretId, Map<String, String> newSecretData)
+            throws SecretNotFoundException;
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/exceptions/SecretException.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/exceptions/SecretException.java
new file mode 100644
index 00000000000..b856938ba6a
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/exceptions/SecretException.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.secret.exceptions;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Base exception for all secret-related errors.
+ *
+ * <p>This exception serves as the parent class for all secret related 
exceptions, providing a
+ * common type for handling errors that occur during secret management 
operations.
+ */
+@PublicEvolving
+public class SecretException extends RuntimeException {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Constructs a new SecretException with the specified detail message.
+     *
+     * @param message the detail message explaining the reason for the 
exception
+     */
+    public SecretException(String message) {
+        super(message);
+    }
+
+    /**
+     * Constructs a new SecretException with the specified detail message and 
cause.
+     *
+     * @param message the detail message explaining the reason for the 
exception
+     * @param cause the cause of the exception
+     */
+    public SecretException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    /**
+     * Constructs a new SecretException with the specified cause.
+     *
+     * @param cause the cause of the exception
+     */
+    public SecretException(Throwable cause) {
+        super(cause);
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/exceptions/SecretNotFoundException.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/exceptions/SecretNotFoundException.java
new file mode 100644
index 00000000000..a773ea7e1e6
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/exceptions/SecretNotFoundException.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.secret.exceptions;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Exception thrown when a requested secret cannot be found in the secret 
store.
+ *
+ * <p>This exception is typically thrown by {@link 
ReadableSecretStore#getSecret(String)} or {@link
+ * WritableSecretStore#updateSecret(String, java.util.Map)} when attempting to 
access or modify a
+ * secret that does not exist.
+ */
+@PublicEvolving
+public class SecretNotFoundException extends Exception {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Constructs a new SecretNotFoundException with the specified detail 
message.
+     *
+     * @param message the detail message explaining the reason for the 
exception
+     */
+    public SecretNotFoundException(String message) {
+        super(message);
+    }
+
+    /**
+     * Constructs a new SecretNotFoundException with the specified detail 
message and cause.
+     *
+     * @param message the detail message explaining the reason for the 
exception
+     * @param cause the cause of the exception
+     */
+    public SecretNotFoundException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}

Reply via email to