This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3155daa9062 [FLINK-38263][table] Add SecretStore related interfaces
3155daa9062 is described below
commit 3155daa90629024176cc8773f45a9d99872899a6
Author: Hao Li <[email protected]>
AuthorDate: Tue Mar 3 08:50:52 2026 -0800
[FLINK-38263][table] Add SecretStore related interfaces
This closes #27394.
---
.../test_environment_settings_completeness.py | 4 +-
.../flink/table/api/EnvironmentSettings.java | 30 +++-
.../table/api/internal/TableEnvironmentImpl.java | 9 +
.../flink/table/factories/ApiFactoryUtil.java | 101 +++++++++++
.../table/secret/GenericInMemorySecretStore.java | 94 +++++++++++
.../secret/GenericInMemorySecretStoreFactory.java | 63 +++++++
.../org.apache.flink.table.factories.Factory | 1 +
.../flink/table/factories/ApiFactoryUtilTest.java | 77 +++++++++
.../GenericInMemorySecretStoreFactoryTest.java | 67 ++++++++
.../secret/GenericInMemorySecretStoreTest.java | 188 +++++++++++++++++++++
.../apache/flink/table/factories/FactoryUtil.java | 36 ++++
.../flink/table/secret/CommonSecretOptions.java | 41 +++++
.../flink/table/secret/ReadableSecretStore.java | 45 +++++
.../org/apache/flink/table/secret/SecretStore.java | 33 ++++
.../flink/table/secret/SecretStoreFactory.java | 62 +++++++
.../flink/table/secret/WritableSecretStore.java | 64 +++++++
.../table/secret/exceptions/SecretException.java | 61 +++++++
.../secret/exceptions/SecretNotFoundException.java | 53 ++++++
18 files changed, 1025 insertions(+), 4 deletions(-)
diff --git
a/flink-python/pyflink/table/tests/test_environment_settings_completeness.py
b/flink-python/pyflink/table/tests/test_environment_settings_completeness.py
index 88da848c8ef..5ce757f2472 100644
--- a/flink-python/pyflink/table/tests/test_environment_settings_completeness.py
+++ b/flink-python/pyflink/table/tests/test_environment_settings_completeness.py
@@ -38,7 +38,7 @@ class
EnvironmentSettingsCompletenessTests(PythonAPICompletenessTestCase, PyFlin
def excluded_methods(cls):
# internal interfaces, no need to expose to users.
return {'getPlanner', 'getExecutor', 'getUserClassLoader',
'getCatalogStore',
- 'toConfiguration', 'fromConfiguration', 'getSqlFactory'}
+ 'getSecretStore', 'toConfiguration', 'fromConfiguration',
'getSqlFactory'}
class
EnvironmentSettingsBuilderCompletenessTests(PythonAPICompletenessTestCase,
PyFlinkTestCase):
@@ -59,7 +59,7 @@ class
EnvironmentSettingsBuilderCompletenessTests(PythonAPICompletenessTestCase,
def excluded_methods(cls):
# internal interfaces, no need to expose to users.
# withSqlFactory - needs to be implemented
- return {'withClassLoader', 'withCatalogStore', 'withSqlFactory'}
+ return {'withClassLoader', 'withCatalogStore', 'withSecretStore',
'withSqlFactory'}
if __name__ == '__main__':
import unittest
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
index e08704a9b99..92a375bf0bf 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.catalog.CatalogStore;
import org.apache.flink.table.expressions.SqlFactory;
import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.secret.SecretStore;
import javax.annotation.Nullable;
@@ -66,16 +67,19 @@ public class EnvironmentSettings {
private final @Nullable CatalogStore catalogStore;
private final @Nullable SqlFactory sqlFactory;
+ private final @Nullable SecretStore secretStore;
private EnvironmentSettings(
Configuration configuration,
ClassLoader classLoader,
CatalogStore catalogStore,
- SqlFactory sqlFactory) {
+ SqlFactory sqlFactory,
+ SecretStore secretStore) {
this.configuration = configuration;
this.classLoader = classLoader;
this.catalogStore = catalogStore;
this.sqlFactory = sqlFactory;
+ this.secretStore = secretStore;
}
/**
@@ -153,6 +157,11 @@ public class EnvironmentSettings {
return Optional.ofNullable(sqlFactory);
}
+ @Internal
+ public Optional<SecretStore> getSecretStore() {
+ return Optional.ofNullable(secretStore);
+ }
+
/** A builder for {@link EnvironmentSettings}. */
@PublicEvolving
public static class Builder {
@@ -162,6 +171,7 @@ public class EnvironmentSettings {
private @Nullable CatalogStore catalogStore;
private @Nullable SqlFactory sqlFactory;
+ private @Nullable SecretStore secretStore;
public Builder() {}
@@ -250,12 +260,28 @@ public class EnvironmentSettings {
return this;
}
+ /**
+ * Specifies the {@link SecretStore} to be used for managing secrets
in the {@link
+ * TableEnvironment}.
+ *
+ * <p>The secret store allows for secure storage and retrieval of
sensitive configuration
+ * data such as credentials, tokens, and passwords.
+ *
+ * @param secretStore the secret store instance to use
+ * @return this builder
+ */
+ public Builder withSecretStore(SecretStore secretStore) {
+ this.secretStore = secretStore;
+ return this;
+ }
+
/** Returns an immutable instance of {@link EnvironmentSettings}. */
public EnvironmentSettings build() {
if (classLoader == null) {
classLoader = Thread.currentThread().getContextClassLoader();
}
- return new EnvironmentSettings(configuration, classLoader,
catalogStore, sqlFactory);
+ return new EnvironmentSettings(
+ configuration, classLoader, catalogStore, sqlFactory,
secretStore);
}
}
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 388b4f969db..4743663e8a4 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -120,6 +120,8 @@ import
org.apache.flink.table.operations.utils.OperationTreeBuilder;
import org.apache.flink.table.resource.ResourceManager;
import org.apache.flink.table.resource.ResourceType;
import org.apache.flink.table.resource.ResourceUri;
+import org.apache.flink.table.secret.SecretStore;
+import org.apache.flink.table.secret.SecretStoreFactory;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
@@ -258,6 +260,13 @@ public class TableEnvironmentImpl implements
TableEnvironmentInternal {
final CatalogStore catalogStore = catalogStoreResult.getCatalogStore();
final CatalogStoreFactory catalogStoreFactory =
catalogStoreResult.getCatalogStoreFactory();
+ final ApiFactoryUtil.SecretStoreResult secretStoreResult =
+ ApiFactoryUtil.getOrCreateSecretStore(
+ settings.getSecretStore(),
settings.getConfiguration(), userClassLoader);
+ final SecretStore secretStore = secretStoreResult.getSecretStore();
+ final SecretStoreFactory secretStoreFactory =
secretStoreResult.getSecretStoreFactory();
+ // TODO (FLINK-38261): pass secret store to catalog manager for
encryption/decryption
+
// use configuration to init table config
final TableConfig tableConfig = TableConfig.getDefault();
tableConfig.setRootConfiguration(executor.getConfiguration());
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ApiFactoryUtil.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ApiFactoryUtil.java
index cb3b9189d46..5062db03791 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ApiFactoryUtil.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ApiFactoryUtil.java
@@ -19,10 +19,14 @@
package org.apache.flink.table.factories;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DelegatingConfiguration;
import org.apache.flink.table.catalog.CatalogStore;
import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.secret.CommonSecretOptions;
+import org.apache.flink.table.secret.SecretStore;
+import org.apache.flink.table.secret.SecretStoreFactory;
import javax.annotation.Nullable;
@@ -127,4 +131,101 @@ public class ApiFactoryUtil {
return context;
}
+
+ /** Result holder for secret store and factory. */
+ @Internal
+ public static class SecretStoreResult {
+ private final SecretStore secretStore;
+ @Nullable private final SecretStoreFactory secretStoreFactory;
+
+ public SecretStoreResult(
+ SecretStore secretStore, @Nullable SecretStoreFactory
secretStoreFactory) {
+ this.secretStore = secretStore;
+ this.secretStoreFactory = secretStoreFactory;
+ }
+
+ public SecretStore getSecretStore() {
+ return secretStore;
+ }
+
+ @Nullable
+ public SecretStoreFactory getSecretStoreFactory() {
+ return secretStoreFactory;
+ }
+ }
+
+ /**
+ * Gets or creates a {@link SecretStore}. If a secret store is provided in
settings, it will be
+ * used directly. Otherwise, a new secret store will be created using the
factory.
+ *
+ * @param providedSecretStore the secret store from settings, if present
+ * @param configuration the configuration
+ * @param classLoader the user classloader
+ * @return a result containing the secret store and factory (factory is
null if store was
+ * provided)
+ */
+ public static SecretStoreResult getOrCreateSecretStore(
+ Optional<SecretStore> providedSecretStore,
+ Configuration configuration,
+ ClassLoader classLoader) {
+ if (providedSecretStore.isPresent()) {
+ return new SecretStoreResult(providedSecretStore.get(), null);
+ } else {
+ SecretStoreFactory secretStoreFactory =
+ findAndCreateSecretStoreFactory(configuration,
classLoader);
+ SecretStoreFactory.Context secretStoreFactoryContext =
+ buildSecretStoreFactoryContext(configuration, classLoader);
+ secretStoreFactory.open(secretStoreFactoryContext);
+ SecretStore secretStore = secretStoreFactory.createSecretStore();
+ return new SecretStoreResult(secretStore, secretStoreFactory);
+ }
+ }
+
+ /**
+ * Finds and creates a {@link SecretStoreFactory} using the provided
{@link Configuration} and
+ * user classloader.
+ *
+ * <p>The configuration format should be as follows:
+ *
+ * <pre>{@code
+ * table.secret-store.kind: {identifier}
+ * table.secret-store.{identifier}.{param1}: xxx
+ * table.secret-store.{identifier}.{param2}: xxx
+ * }</pre>
+ */
+ @VisibleForTesting
+ static SecretStoreFactory findAndCreateSecretStoreFactory(
+ Configuration configuration, ClassLoader classLoader) {
+ String identifier =
configuration.get(CommonSecretOptions.TABLE_SECRET_STORE_KIND);
+
+ SecretStoreFactory secretStoreFactory =
+ FactoryUtil.discoverFactory(classLoader,
SecretStoreFactory.class, identifier);
+
+ return secretStoreFactory;
+ }
+
+ /**
+ * Build a {@link SecretStoreFactory.Context} for opening the {@link
SecretStoreFactory}.
+ *
+ * <p>The configuration format should be as follows:
+ *
+ * <pre>{@code
+ * table.secret-store.kind: {identifier}
+ * table.secret-store.{identifier}.{param1}: xxx
+ * table.secret-store.{identifier}.{param2}: xxx
+ * }</pre>
+ */
+ @VisibleForTesting
+ static SecretStoreFactory.Context buildSecretStoreFactoryContext(
+ Configuration configuration, ClassLoader classLoader) {
+ String identifier =
configuration.get(CommonSecretOptions.TABLE_SECRET_STORE_KIND);
+ String secretStoreOptionPrefix =
+ CommonSecretOptions.TABLE_SECRET_STORE_OPTION_PREFIX +
identifier + ".";
+ Map<String, String> options =
+ new DelegatingConfiguration(configuration,
secretStoreOptionPrefix).toMap();
+ SecretStoreFactory.Context context =
+ new FactoryUtil.DefaultSecretStoreContext(options,
configuration, classLoader);
+
+ return context;
+ }
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStore.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStore.java
new file mode 100644
index 00000000000..7554bb789e6
--- /dev/null
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStore.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.secret;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.secret.exceptions.SecretNotFoundException;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A generic in-memory implementation of both {@link ReadableSecretStore} and
{@link
+ * WritableSecretStore}.
+ *
+ * <p>This implementation stores secrets in memory as immutable Map objects.
It is suitable for
+ * testing and development purposes but should not be used in production
environments as secrets are
+ * not encrypted.
+ */
+@Internal
+public class GenericInMemorySecretStore implements ReadableSecretStore,
WritableSecretStore {
+
+ private final Map<String, Map<String, String>> secrets;
+
+ public GenericInMemorySecretStore() {
+ this.secrets = new HashMap<>();
+ }
+
+ @Override
+ public Map<String, String> getSecret(String secretId) throws
SecretNotFoundException {
+ checkNotNull(secretId, "Secret ID cannot be null");
+
+ Map<String, String> secretData = secrets.get(secretId);
+ if (secretData == null) {
+ throw new SecretNotFoundException(
+ String.format("Secret with ID '%s' not found", secretId));
+ }
+
+ return secretData;
+ }
+
+ @Override
+ public String storeSecret(Map<String, String> secretData) {
+ checkNotNull(secretData, "Secret data cannot be null");
+
+ String secretId = UUID.randomUUID().toString();
+ secrets.put(secretId, Collections.unmodifiableMap(new
HashMap<>(secretData)));
+ return secretId;
+ }
+
+ @Override
+ public void removeSecret(String secretId) {
+ checkNotNull(secretId, "Secret ID cannot be null");
+ secrets.remove(secretId);
+ }
+
+ @Override
+ public void updateSecret(String secretId, Map<String, String>
newSecretData)
+ throws SecretNotFoundException {
+ checkNotNull(secretId, "Secret ID cannot be null");
+ checkNotNull(newSecretData, "New secret data cannot be null");
+
+ if (!secrets.containsKey(secretId)) {
+ throw new SecretNotFoundException(
+ String.format("Secret with ID '%s' not found", secretId));
+ }
+
+ secrets.put(secretId, Collections.unmodifiableMap(new
HashMap<>(newSecretData)));
+ }
+
+ /** Clears all secrets from the store (for testing purposes). */
+ void clear() {
+ secrets.clear();
+ }
+}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStoreFactory.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStoreFactory.java
new file mode 100644
index 00000000000..bbccfd4ec2f
--- /dev/null
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStoreFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.secret;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+
+import java.util.Set;
+
+/**
+ * Factory for creating {@link GenericInMemorySecretStore} instances.
+ *
+ * <p>This factory creates in-memory secret stores that are suitable for
testing and development
+ * purposes. Secrets are stored in plaintext JSON format in memory and are not
persisted.
+ */
+@Internal
+public class GenericInMemorySecretStoreFactory implements SecretStoreFactory {
+
+ public static final String IDENTIFIER = "generic_in_memory";
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return Set.of();
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ return Set.of();
+ }
+
+ @Override
+ public SecretStore createSecretStore() {
+ return new GenericInMemorySecretStore();
+ }
+
+ @Override
+ public void open(Context context) {}
+
+ @Override
+ public void close() throws CatalogException {}
+}
diff --git
a/flink-table/flink-table-api-java/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/flink-table/flink-table-api-java/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index d86ead55ad6..8191a71a678 100644
---
a/flink-table/flink-table-api-java/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++
b/flink-table/flink-table-api-java/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -16,3 +16,4 @@
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.catalog.GenericInMemoryCatalogStoreFactory
org.apache.flink.table.catalog.FileCatalogStoreFactory
+org.apache.flink.table.secret.GenericInMemorySecretStoreFactory
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ApiFactoryUtilTest.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ApiFactoryUtilTest.java
index c8e1908291b..249fd16bbc8 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ApiFactoryUtilTest.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ApiFactoryUtilTest.java
@@ -22,6 +22,9 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.flink.table.catalog.FileCatalogStoreFactory;
import org.apache.flink.table.catalog.GenericInMemoryCatalogStoreFactory;
+import org.apache.flink.table.secret.CommonSecretOptions;
+import org.apache.flink.table.secret.GenericInMemorySecretStoreFactory;
+import org.apache.flink.table.secret.SecretStoreFactory;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -133,4 +136,78 @@ class ApiFactoryUtilTest {
Arguments.of("file", FileCatalogStoreFactory.class),
Arguments.of(null, GenericInMemoryCatalogStoreFactory.class));
}
+
+ @ParameterizedTest(name = "kind={0}, expectedFactory={1}")
+ @MethodSource("secretStoreFactoryTestParameters")
+ void testFindAndCreateSecretStoreFactory(String kind, Class<?>
expectedFactoryClass) {
+ Configuration configuration = new Configuration();
+ if (kind != null) {
+ configuration.set(CommonSecretOptions.TABLE_SECRET_STORE_KIND,
kind);
+ }
+ ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
+
+ SecretStoreFactory factory =
+ ApiFactoryUtil.findAndCreateSecretStoreFactory(configuration,
classLoader);
+
+ assertThat(factory).isInstanceOf(expectedFactoryClass);
+ }
+
+ @Test
+ void testBuildSecretStoreFactoryContext() {
+ Configuration configuration = new Configuration();
+ configuration.set(CommonSecretOptions.TABLE_SECRET_STORE_KIND,
"generic_in_memory");
+
configuration.setString("table.secret-store.generic_in_memory.option1",
"value1");
+
configuration.setString("table.secret-store.generic_in_memory.option2",
"value2");
+ ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
+
+ SecretStoreFactory.Context context =
+ ApiFactoryUtil.buildSecretStoreFactoryContext(configuration,
classLoader);
+
+ assertThat(context).isNotNull();
+ assertThat(context.getOptions())
+ .containsExactlyInAnyOrderEntriesOf(
+ Map.of("option1", "value1", "option2", "value2"));
+ assertThat(context.getConfiguration()).isEqualTo(configuration);
+ assertThat(context.getClassLoader()).isEqualTo(classLoader);
+ }
+
+ @Test
+ void testBuildSecretStoreFactoryContextWithoutOptions() {
+ Configuration configuration = new Configuration();
+ configuration.set(CommonSecretOptions.TABLE_SECRET_STORE_KIND,
"generic_in_memory");
+ ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
+
+ SecretStoreFactory.Context context =
+ ApiFactoryUtil.buildSecretStoreFactoryContext(configuration,
classLoader);
+
+ assertThat(context).isNotNull();
+ assertThat(context.getOptions()).isEmpty();
+ assertThat(context.getConfiguration()).isEqualTo(configuration);
+ assertThat(context.getClassLoader()).isEqualTo(classLoader);
+ }
+
+ @Test
+ void testBuildSecretStoreFactoryContextOnlyExtractsRelevantOptions() {
+ Configuration configuration = new Configuration();
+ configuration.set(CommonSecretOptions.TABLE_SECRET_STORE_KIND,
"generic_in_memory");
+
configuration.setString("table.secret-store.generic_in_memory.option1",
"value1");
+
configuration.setString("table.secret-store.generic_in_memory.option2",
"value2");
+ configuration.setString("table.secret-store.other.irrelevant",
"should-not-appear");
+ configuration.setString("other.config.key", "should-not-appear");
+ ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
+
+ SecretStoreFactory.Context context =
+ ApiFactoryUtil.buildSecretStoreFactoryContext(configuration,
classLoader);
+
+ assertThat(context).isNotNull();
+ assertThat(context.getOptions())
+ .containsExactlyInAnyOrderEntriesOf(
+ Map.of("option1", "value1", "option2", "value2"));
+ }
+
+ private static Stream<Arguments> secretStoreFactoryTestParameters() {
+ return Stream.of(
+ Arguments.of("generic_in_memory",
GenericInMemorySecretStoreFactory.class),
+ Arguments.of(null, GenericInMemorySecretStoreFactory.class));
+ }
}
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/secret/GenericInMemorySecretStoreFactoryTest.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/secret/GenericInMemorySecretStoreFactoryTest.java
new file mode 100644
index 00000000000..a9351143306
--- /dev/null
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/secret/GenericInMemorySecretStoreFactoryTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.secret;
+
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.FactoryUtil.DefaultSecretStoreContext;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link GenericInMemorySecretStoreFactory}. */
+class GenericInMemorySecretStoreFactoryTest {
+
+ @Test
+ void testSecretStoreInit() {
+ String factoryIdentifier =
GenericInMemorySecretStoreFactory.IDENTIFIER;
+ Map<String, String> options = new HashMap<>();
+ ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
+ final DefaultSecretStoreContext discoveryContext =
+ new DefaultSecretStoreContext(options, null, classLoader);
+ final SecretStoreFactory factory =
+ FactoryUtil.discoverFactory(
+ classLoader, SecretStoreFactory.class,
factoryIdentifier);
+ try {
+ factory.open(discoveryContext);
+ SecretStore secretStore = factory.createSecretStore();
+ assertThat(secretStore instanceof
GenericInMemorySecretStore).isTrue();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ factory.close();
+ }
+ }
+
+ @Test
+ void testFactoryIdentifier() {
+ GenericInMemorySecretStoreFactory factory = new
GenericInMemorySecretStoreFactory();
+ assertThat(factory.factoryIdentifier()).isEqualTo("generic_in_memory");
+ }
+
+ @Test
+ void testRequiredAndOptionalOptions() {
+ GenericInMemorySecretStoreFactory factory = new
GenericInMemorySecretStoreFactory();
+ assertThat(factory.requiredOptions()).isEmpty();
+ assertThat(factory.optionalOptions()).isEmpty();
+ }
+}
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/secret/GenericInMemorySecretStoreTest.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/secret/GenericInMemorySecretStoreTest.java
new file mode 100644
index 00000000000..53674e1e5c0
--- /dev/null
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/secret/GenericInMemorySecretStoreTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.secret;
+
+import org.apache.flink.table.secret.exceptions.SecretNotFoundException;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+
+/** Test for {@link GenericInMemorySecretStore}. */
+class GenericInMemorySecretStoreTest {
+
+ private GenericInMemorySecretStore secretStore;
+
+ @BeforeEach
+ void setUp() {
+ secretStore = new GenericInMemorySecretStore();
+ }
+
+ @Test
+ void testStoreAndGetSecret() throws SecretNotFoundException {
+ Map<String, String> secretData = Map.of("username", "testuser",
"password", "testpass");
+
+ String secretId = secretStore.storeSecret(secretData);
+ assertThat(secretId).isNotNull();
+
+ Map<String, String> retrievedSecret = secretStore.getSecret(secretId);
+ assertThat(retrievedSecret).isNotNull();
+ assertThat(retrievedSecret.get("username")).isEqualTo("testuser");
+ assertThat(retrievedSecret.get("password")).isEqualTo("testpass");
+ }
+
+ @Test
+ void testGetNonExistentSecret() {
+ assertThatThrownBy(() -> secretStore.getSecret("non-existent-id"))
+ .isInstanceOf(SecretNotFoundException.class)
+ .hasMessageContaining("Secret with ID 'non-existent-id' not
found");
+ }
+
+ @Test
+ void testGetSecretWithNullId() {
+ assertThatThrownBy(() -> secretStore.getSecret(null))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessageContaining("Secret ID cannot be null");
+ }
+
+ @Test
+ void testStoreSecretWithNullData() {
+ assertThatThrownBy(() -> secretStore.storeSecret(null))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessageContaining("Secret data cannot be null");
+ }
+
+ @Test
+ void testRemoveSecret() throws SecretNotFoundException {
+ Map<String, String> secretData = Map.of("key", "value");
+
+ String secretId = secretStore.storeSecret(secretData);
+ assertThat(secretStore.getSecret(secretId)).isNotNull();
+
+ secretStore.removeSecret(secretId);
+ assertThatThrownBy(() -> secretStore.getSecret(secretId))
+ .isInstanceOf(SecretNotFoundException.class)
+ .hasMessageContaining("Secret with ID '" + secretId + "' not
found");
+ }
+
+ @Test
+ void testRemoveSecretWithNullId() {
+ assertThatThrownBy(() -> secretStore.removeSecret(null))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessageContaining("Secret ID cannot be null");
+ }
+
+ @Test
+ void testRemoveNonExistentSecret() {
+ // Should not throw exception, just silently remove nothing
+ assertDoesNotThrow(() -> secretStore.removeSecret("non-existent-id"));
+ }
+
+ @Test
+ void testUpdateSecret() throws SecretNotFoundException {
+ Map<String, String> originalData = Map.of("username", "olduser",
"password", "oldpass");
+
+ String secretId = secretStore.storeSecret(originalData);
+
+ Map<String, String> updatedData = Map.of("username", "newuser",
"password", "newpass");
+
+ secretStore.updateSecret(secretId, updatedData);
+
+ Map<String, String> retrievedSecret = secretStore.getSecret(secretId);
+ assertThat(retrievedSecret.get("username")).isEqualTo("newuser");
+ assertThat(retrievedSecret.get("password")).isEqualTo("newpass");
+ }
+
+ @Test
+ void testUpdateNonExistentSecret() {
+ Map<String, String> secretData = Map.of("key", "value");
+
+ assertThatThrownBy(() -> secretStore.updateSecret("non-existent-id",
secretData))
+ .isInstanceOf(SecretNotFoundException.class)
+ .hasMessageContaining("Secret with ID 'non-existent-id' not
found");
+ }
+
+ @Test
+ void testUpdateSecretWithNullId() {
+ Map<String, String> secretData = Map.of("key", "value");
+
+ assertThatThrownBy(() -> secretStore.updateSecret(null, secretData))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessageContaining("Secret ID cannot be null");
+ }
+
+ @Test
+ void testUpdateSecretWithNullData() {
+ Map<String, String> originalData = Map.of("key", "value");
+ String secretId = secretStore.storeSecret(originalData);
+
+ assertThatThrownBy(() -> secretStore.updateSecret(secretId, null))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessageContaining("New secret data cannot be null");
+ }
+
+ @Test
+ void testClear() {
+ Map<String, String> secretData1 = Map.of("key1", "value1");
+ String secretId1 = secretStore.storeSecret(secretData1);
+
+ Map<String, String> secretData2 = Map.of("key2", "value2");
+ String secretId2 = secretStore.storeSecret(secretData2);
+
+ secretStore.clear();
+
+ assertThatThrownBy(() -> secretStore.getSecret(secretId1))
+ .isInstanceOf(SecretNotFoundException.class);
+ assertThatThrownBy(() -> secretStore.getSecret(secretId2))
+ .isInstanceOf(SecretNotFoundException.class);
+ }
+
+ @Test
+ void testStoreEmptySecret() throws SecretNotFoundException {
+ Map<String, String> emptyData = Map.of();
+ String secretId = secretStore.storeSecret(emptyData);
+
+ Map<String, String> retrievedSecret = secretStore.getSecret(secretId);
+ assertThat(retrievedSecret).isNotNull();
+ assertThat(retrievedSecret).isEmpty();
+ }
+
+ @Test
+ void testStoreMultipleSecrets() throws SecretNotFoundException {
+ Map<String, String> secret1 = Map.of("user1", "pass1");
+
+ Map<String, String> secret2 = Map.of("user2", "pass2");
+
+ String secretId1 = secretStore.storeSecret(secret1);
+ String secretId2 = secretStore.storeSecret(secret2);
+
+ assertThat(secretId1).isNotEqualTo(secretId2);
+
+ Map<String, String> retrieved1 = secretStore.getSecret(secretId1);
+ Map<String, String> retrieved2 = secretStore.getSecret(secretId2);
+
+ assertThat(retrieved1.get("user1")).isEqualTo("pass1");
+ assertThat(retrieved2.get("user2")).isEqualTo("pass2");
+ }
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
index 691397c08d4..594a7bf1d5d 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
@@ -42,6 +42,7 @@ import
org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.legacy.factories.TableFactory;
import org.apache.flink.table.ml.ModelProvider;
import org.apache.flink.table.module.Module;
+import org.apache.flink.table.secret.SecretStoreFactory;
import org.apache.flink.table.utils.EncodingUtils;
import org.apache.flink.table.watermark.WatermarkEmitStrategy;
import org.apache.flink.util.Preconditions;
@@ -1411,6 +1412,41 @@ public final class FactoryUtil {
}
}
+ /** Default implementation of {@link SecretStoreFactory.Context}. */
+ @Internal
+ public static class DefaultSecretStoreContext implements
SecretStoreFactory.Context {
+
+ private Map<String, String> options;
+
+ private ReadableConfig configuration;
+
+ private ClassLoader classLoader;
+
+ public DefaultSecretStoreContext(
+ Map<String, String> options,
+ ReadableConfig configuration,
+ ClassLoader classLoader) {
+ this.options = options;
+ this.configuration = configuration;
+ this.classLoader = classLoader;
+ }
+
+ @Override
+ public Map<String, String> getOptions() {
+ return options;
+ }
+
+ @Override
+ public ReadableConfig getConfiguration() {
+ return configuration;
+ }
+
+ @Override
+ public ClassLoader getClassLoader() {
+ return classLoader;
+ }
+ }
+
/** Default implementation of {@link ModuleFactory.Context}. */
@Internal
public static class DefaultModuleContext implements ModuleFactory.Context {
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/CommonSecretOptions.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/CommonSecretOptions.java
new file mode 100644
index 00000000000..461909a6cd1
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/CommonSecretOptions.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.secret;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/** A collection of {@link ConfigOption} which are used for secret store
configuration. */
+@Internal
+public class CommonSecretOptions {
+
+ public static final String DEFAULT_SECRET_STORE_KIND = "generic_in_memory";
+ public static final ConfigOption<String> TABLE_SECRET_STORE_KIND =
+ ConfigOptions.key("table.secret-store.kind")
+ .stringType()
+ .defaultValue(DEFAULT_SECRET_STORE_KIND)
+ .withDescription(
+ "The kind of secret store to be used. Out of the
box, 'generic_in_memory' option is supported. "
+ + "Implementations can provide custom
secret stores for different backends "
+ + "(e.g., cloud-specific secret
managers).");
+
+ /** Used to filter the specific options for secret store. */
+ public static final String TABLE_SECRET_STORE_OPTION_PREFIX =
"table.secret-store.";
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/ReadableSecretStore.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/ReadableSecretStore.java
new file mode 100644
index 00000000000..70dd48e21f9
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/ReadableSecretStore.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.secret;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.secret.exceptions.SecretNotFoundException;
+
+import java.util.Map;
+
+/**
+ * Interface for retrieving secrets from a secret store.
+ *
+ * <p>This interface enables read-only access to stored secrets, allowing
applications to retrieve
+ * sensitive configuration data such as credentials, API tokens, and passwords.
+ *
+ * <p>Implementations of this interface should ensure secure retrieval and
handling of secret data.
+ */
+@PublicEvolving
+public interface ReadableSecretStore extends SecretStore {
+
+ /**
+ * Retrieves a secret from the store by its identifier.
+ *
+ * @param secretId the unique identifier of the secret to retrieve
+ * @return a map containing the secret data as key-value pairs
+ * @throws SecretNotFoundException if the secret with the given identifier
does not exist
+ */
+ Map<String, String> getSecret(String secretId) throws
SecretNotFoundException;
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/SecretStore.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/SecretStore.java
new file mode 100644
index 00000000000..89af1d469e1
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/SecretStore.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.secret;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Base marker interface for secret store implementations.
+ *
+ * <p>This interface serves as the common base for both {@link
ReadableSecretStore} and {@link
+ * WritableSecretStore}, allowing for flexible secret management
implementations.
+ *
+ * <p>Secret stores are used to manage sensitive configuration data
(credentials, tokens, passwords,
+ * etc.) in Flink SQL and Table API applications.
+ */
+@PublicEvolving
+public interface SecretStore {}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/SecretStoreFactory.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/SecretStoreFactory.java
new file mode 100644
index 00000000000..1fd40b8e78b
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/SecretStoreFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.secret;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.secret.exceptions.SecretException;
+
+import java.util.Map;
+
+/** Factory for creating SecretStore instances. */
+@PublicEvolving
+public interface SecretStoreFactory extends Factory {
+
+ /** Creates a SecretStore instance. */
+ SecretStore createSecretStore();
+
+ /** Initialize secret store. */
+ void open(Context context) throws SecretException;
+
+ /** Close secret store. */
+ void close() throws CatalogException;
+
+ /** Context for creating a SecretStore. */
+ @PublicEvolving
+ interface Context {
+ /**
+ * Returns the options with which the secret store is created.
+ *
+ * <p>An implementation should perform validation of these options.
+ */
+ Map<String, String> getOptions();
+
+ /** Gives read-only access to the configuration of the current
session. */
+ ReadableConfig getConfiguration();
+
+ /**
+ * Returns the class loader of the current session.
+ *
+ * <p>The class loader is in particular useful for discovering further
(nested) factories.
+ */
+ ClassLoader getClassLoader();
+ }
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/WritableSecretStore.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/WritableSecretStore.java
new file mode 100644
index 00000000000..db5037b7b2f
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/WritableSecretStore.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.secret;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.secret.exceptions.SecretNotFoundException;
+
+import java.util.Map;
+
+/**
+ * Interface for storing, updating, and removing secrets in a secret store.
+ *
+ * <p>This interface provides write operations for managing secrets, including
adding new secrets,
+ * updating existing ones, and removing secrets that are no longer needed.
+ *
+ * <p>Implementations should ensure that secret operations are performed
securely and, where
+ * applicable, atomically.
+ */
+@PublicEvolving
+public interface WritableSecretStore extends SecretStore {
+
+ /**
+ * Stores a new secret in the secret store.
+ *
+ * @param secretData a map containing the secret data as key-value pairs
to be stored
+ * @return a unique identifier for the stored secret
+ */
+ String storeSecret(Map<String, String> secretData);
+
+ /**
+ * Removes a secret from the secret store.
+ *
+ * @param secretId the unique identifier of the secret to remove
+ */
+ void removeSecret(String secretId);
+
+ /**
+ * Atomically updates an existing secret with new data.
+ *
+ * <p>This operation replaces the entire secret data with the provided new
data.
+ *
+ * @param secretId the unique identifier of the secret to update
+ * @param newSecretData a map containing the new secret data as key-value
pairs
+ * @throws SecretNotFoundException if the secret with the given identifier
does not exist
+ */
+ void updateSecret(String secretId, Map<String, String> newSecretData)
+ throws SecretNotFoundException;
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/exceptions/SecretException.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/exceptions/SecretException.java
new file mode 100644
index 00000000000..b856938ba6a
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/exceptions/SecretException.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.secret.exceptions;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Base exception for all secret-related errors.
+ *
+ * <p>This exception serves as the parent class for all secret related
exceptions, providing a
+ * common type for handling errors that occur during secret management
operations.
+ */
+@PublicEvolving
+public class SecretException extends RuntimeException {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Constructs a new SecretException with the specified detail message.
+ *
+ * @param message the detail message explaining the reason for the
exception
+ */
+ public SecretException(String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs a new SecretException with the specified detail message and
cause.
+ *
+ * @param message the detail message explaining the reason for the
exception
+ * @param cause the cause of the exception
+ */
+ public SecretException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ /**
+ * Constructs a new SecretException with the specified cause.
+ *
+ * @param cause the cause of the exception
+ */
+ public SecretException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/exceptions/SecretNotFoundException.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/exceptions/SecretNotFoundException.java
new file mode 100644
index 00000000000..a773ea7e1e6
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/exceptions/SecretNotFoundException.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.secret.exceptions;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Exception thrown when a requested secret cannot be found in the secret
store.
+ *
+ * <p>This exception is typically thrown by {@link
ReadableSecretStore#getSecret(String)} or {@link
+ * WritableSecretStore#updateSecret(String, java.util.Map)} when attempting to
access or modify a
+ * secret that does not exist.
+ */
+@PublicEvolving
+public class SecretNotFoundException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Constructs a new SecretNotFoundException with the specified detail
message.
+ *
+ * @param message the detail message explaining the reason for the
exception
+ */
+ public SecretNotFoundException(String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs a new SecretNotFoundException with the specified detail
message and cause.
+ *
+ * @param message the detail message explaining the reason for the
exception
+ * @param cause the cause of the exception
+ */
+ public SecretNotFoundException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}