This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 587a9d8b93e [FLINK-39590][table] Add support for CONNECTION in catalog 
APIs
587a9d8b93e is described below

commit 587a9d8b93e9b0089d645bb7c4da54dba2619a81
Author: Hao Li <[email protected]>
AuthorDate: Tue May 26 08:08:11 2026 -0700

    [FLINK-39590][table] Add support for CONNECTION in catalog APIs
    
    This closes #28085.
---
 .../table/tests/test_catalog_completeness.py       |   8 +-
 .../table/api/internal/TableEnvironmentImpl.java   |   7 +
 .../apache/flink/table/catalog/CatalogManager.java | 383 +++++++++++++++++++-
 .../table/catalog/GenericInMemoryCatalog.java      |  76 ++++
 .../catalog/listener/AlterConnectionEvent.java     |  72 ++++
 .../listener/ConnectionModificationEvent.java      |  34 ++
 .../catalog/listener/CreateConnectionEvent.java    |  71 ++++
 .../catalog/listener/DropConnectionEvent.java      |  72 ++++
 .../flink/table/catalog/CatalogManagerTest.java    | 222 ++++++++++++
 .../table/catalog/GenericInMemoryCatalogTest.java  |   5 +
 .../org/apache/flink/table/catalog/Catalog.java    | 100 ++++++
 .../ConnectionAlreadyExistException.java           |  38 ++
 .../exceptions/ConnectionNotExistException.java    |  46 +++
 .../flink/table/factories/ConnectionFactory.java   |  93 +++++
 .../table/factories/DefaultConnectionFactory.java  | 191 ++++++++++
 .../apache/flink/table/factories/FactoryUtil.java  |   8 +
 .../flink/table/secret/ReadableSecretStore.java    |   5 +-
 .../flink/table/secret/WritableSecretStore.java    |  13 +-
 .../org.apache.flink.table.factories.Factory       |   1 +
 .../apache/flink/table/catalog/CatalogTest.java    | 222 ++++++++++++
 .../factories/DefaultConnectionFactoryTest.java    | 392 +++++++++++++++++++++
 21 files changed, 2052 insertions(+), 7 deletions(-)

diff --git a/flink-python/pyflink/table/tests/test_catalog_completeness.py 
b/flink-python/pyflink/table/tests/test_catalog_completeness.py
index c1ab64c96aa..3e20aab37ca 100644
--- a/flink-python/pyflink/table/tests/test_catalog_completeness.py
+++ b/flink-python/pyflink/table/tests/test_catalog_completeness.py
@@ -45,7 +45,13 @@ class 
CatalogAPICompletenessTests(PythonAPICompletenessTestCase, PyFlinkTestCase
             'getFactory',
             'getTableFactory',
             'getFunctionDefinitionFactory',
-            'listPartitionsByFilter'}
+            'listPartitionsByFilter',
+            'getConnection',
+            'dropConnection',
+            'connectionExists',
+            'listConnections',
+            'createConnection',
+            'alterConnection'}
 
 
 class CatalogDatabaseAPICompletenessTests(PythonAPICompletenessTestCase, 
PyFlinkTestCase):
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 4743663e8a4..496369e78f2 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -122,6 +122,7 @@ import org.apache.flink.table.resource.ResourceType;
 import org.apache.flink.table.resource.ResourceUri;
 import org.apache.flink.table.secret.SecretStore;
 import org.apache.flink.table.secret.SecretStoreFactory;
+import org.apache.flink.table.secret.WritableSecretStore;
 import org.apache.flink.table.types.AbstractDataType;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.utils.DataTypeUtils;
@@ -275,6 +276,11 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
         final ResourceManager resourceManager =
                 new ResourceManager(settings.getConfiguration(), 
userClassLoader);
         final ModuleManager moduleManager = new ModuleManager();
+        final WritableSecretStore writableSecretStore =
+                secretStore instanceof WritableSecretStore
+                        ? (WritableSecretStore) secretStore
+                        : null;
+
         final CatalogManager catalogManager =
                 CatalogManager.newBuilder()
                         .classLoader(userClassLoader)
@@ -297,6 +303,7 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
                         .sqlFactory(
                                 settings.getSqlFactory()
                                         .orElseGet(() -> 
DefaultSqlFactory.INSTANCE))
+                        .writableSecretStore(writableSecretStore)
                         .build();
 
         final FunctionCatalog functionCatalog =
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index 047d2c89d5b..cd375bc9554 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.catalog;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.CatalogNotExistException;
 import org.apache.flink.table.api.EnvironmentSettings;
@@ -33,6 +34,8 @@ import 
org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
 import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
 import org.apache.flink.table.catalog.StartMode.StartModeKind;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
+import 
org.apache.flink.table.catalog.exceptions.ConnectionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.ConnectionNotExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
@@ -41,14 +44,17 @@ import 
org.apache.flink.table.catalog.exceptions.ModelNotExistException;
 import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.listener.AlterConnectionEvent;
 import org.apache.flink.table.catalog.listener.AlterDatabaseEvent;
 import org.apache.flink.table.catalog.listener.AlterModelEvent;
 import org.apache.flink.table.catalog.listener.AlterTableEvent;
 import org.apache.flink.table.catalog.listener.CatalogContext;
 import org.apache.flink.table.catalog.listener.CatalogModificationListener;
+import org.apache.flink.table.catalog.listener.CreateConnectionEvent;
 import org.apache.flink.table.catalog.listener.CreateDatabaseEvent;
 import org.apache.flink.table.catalog.listener.CreateModelEvent;
 import org.apache.flink.table.catalog.listener.CreateTableEvent;
+import org.apache.flink.table.catalog.listener.DropConnectionEvent;
 import org.apache.flink.table.catalog.listener.DropDatabaseEvent;
 import org.apache.flink.table.catalog.listener.DropModelEvent;
 import org.apache.flink.table.catalog.listener.DropTableEvent;
@@ -57,9 +63,14 @@ import org.apache.flink.table.delegation.Planner;
 import org.apache.flink.table.expressions.DefaultSqlFactory;
 import org.apache.flink.table.expressions.SqlFactory;
 import 
org.apache.flink.table.expressions.resolver.ExpressionResolver.ExpressionResolverBuilder;
+import org.apache.flink.table.factories.ConnectionFactory;
+import org.apache.flink.table.factories.DefaultConnectionFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.secret.GenericInMemorySecretStore;
+import org.apache.flink.table.secret.WritableSecretStore;
+import org.apache.flink.table.secret.exceptions.SecretException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
@@ -98,6 +109,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 @Internal
 public final class CatalogManager implements CatalogRegistry, AutoCloseable {
+
     private static final Logger LOG = 
LoggerFactory.getLogger(CatalogManager.class);
 
     // A map between names and catalogs.
@@ -111,6 +123,15 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
     // models coming from catalogs.
     private final Map<ObjectIdentifier, CatalogModel> temporaryModels;
 
+    // Those connections take precedence over corresponding permanent 
connections, thus they shadow
+    // connections coming from catalogs.
+    private final Map<ObjectIdentifier, CatalogConnection> 
temporaryConnections;
+
+    // Backing store for secrets of temporary connections. Lifetime is tied to 
this
+    // CatalogManager — temporary connections are session-scoped, so their 
secrets should not
+    // be persisted in the configured (potentially persistent) 
writableSecretStore.
+    private final WritableSecretStore temporarySecretStore;
+
     // The name of the current catalog and database
     private @Nullable String currentCatalogName;
 
@@ -132,6 +153,10 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
 
     private final MaterializedTableEnricher materializedTableEnricher;
 
+    private final ClassLoader userClassLoader;
+
+    @Nullable private final WritableSecretStore writableSecretStore;
+
     private CatalogManager(
             String defaultCatalogName,
             Catalog defaultCatalog,
@@ -139,7 +164,9 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
             List<CatalogModificationListener> catalogModificationListeners,
             CatalogStoreHolder catalogStoreHolder,
             SqlFactory sqlFactory,
-            MaterializedTableEnricher materializedTableEnricher) {
+            MaterializedTableEnricher materializedTableEnricher,
+            ClassLoader userClassLoader,
+            @Nullable WritableSecretStore writableSecretStore) {
         checkArgument(
                 !StringUtils.isNullOrWhitespaceOnly(defaultCatalogName),
                 "Default catalog name cannot be null or empty");
@@ -152,6 +179,8 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
 
         temporaryTables = new HashMap<>();
         temporaryModels = new HashMap<>();
+        temporaryConnections = new HashMap<>();
+        temporarySecretStore = new GenericInMemorySecretStore();
 
         // right now the default catalog is always the built-in one
         builtInCatalogName = defaultCatalogName;
@@ -164,6 +193,21 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
         this.sqlFactory = sqlFactory;
         this.materializedTableEnricher =
                 checkNotNull(materializedTableEnricher, 
"MaterializedTableEnricher cannot be null");
+        this.userClassLoader = checkNotNull(userClassLoader, "User class 
loader cannot be null");
+        this.writableSecretStore = writableSecretStore;
+    }
+
+    /**
+     * Discovers the {@link ConnectionFactory} for the given connection 
options via SPI, using the
+     * {@link FactoryUtil#CONNECTION_TYPE} option as the factory identifier. 
Falls back to {@link
+     * DefaultConnectionFactory} via the option's default value when {@code 
type} is absent.
+     *
+     * @param options the options of the connection being created / altered / 
dropped
+     * @return the discovered factory
+     */
+    private ConnectionFactory discoverConnectionFactory(Map<String, String> 
options) {
+        final String identifier = 
Configuration.fromMap(options).get(FactoryUtil.CONNECTION_TYPE);
+        return FactoryUtil.discoverFactory(userClassLoader, 
ConnectionFactory.class, identifier);
     }
 
     @VisibleForTesting
@@ -203,6 +247,8 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
 
         private MaterializedTableEnricher materializedTableEnricher;
 
+        private @Nullable WritableSecretStore writableSecretStore;
+
         public Builder classLoader(ClassLoader classLoader) {
             this.classLoader = classLoader;
             return this;
@@ -251,6 +297,11 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
             return this;
         }
 
+        public Builder writableSecretStore(@Nullable WritableSecretStore 
writableSecretStore) {
+            this.writableSecretStore = writableSecretStore;
+            return this;
+        }
+
         public CatalogManager build() {
             checkNotNull(classLoader, "Class loader cannot be null");
             checkNotNull(config, "Config cannot be null");
@@ -271,7 +322,9 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
                     sqlFactory,
                     materializedTableEnricher != null
                             ? materializedTableEnricher
-                            : createDefaultMaterializedTableEnricher());
+                            : createDefaultMaterializedTableEnricher(),
+                    classLoader,
+                    writableSecretStore);
         }
 
         private MaterializedTableEnricher 
createDefaultMaterializedTableEnricher() {
@@ -1791,6 +1844,330 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
         return ResolvedCatalogModel.of(model, resolvedInputSchema, 
resolvedOutputSchema);
     }
 
+    // ------ connections ------
+
+    /**
+     * Get a connection from the catalog with the given object identifier.
+     *
+     * @param objectIdentifier The fully qualified path of the connection.
+     * @return The requested connection wrapped in Optional.
+     */
+    public Optional<CatalogConnection> getConnection(ObjectIdentifier 
objectIdentifier) {
+        CatalogConnection temporaryConnection = 
temporaryConnections.get(objectIdentifier);
+        if (temporaryConnection != null) {
+            return Optional.of(temporaryConnection);
+        }
+
+        Optional<Catalog> catalog = 
getCatalog(objectIdentifier.getCatalogName());
+        if (catalog.isPresent()) {
+            try {
+                return 
Optional.of(catalog.get().getConnection(objectIdentifier.toObjectPath()));
+            } catch (ConnectionNotExistException | 
UnsupportedOperationException e) {
+                // ConnectionNotExistException: connection does not exist in 
this catalog.
+                // UnsupportedOperationException: catalog does not support 
connections.
+                return Optional.empty();
+            }
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    /**
+     * List all connections in the given catalog and database.
+     *
+     * @param catalogName The name of the catalog.
+     * @param databaseName The name of the database.
+     * @return A set of connection names.
+     */
+    public Set<String> listConnections(String catalogName, String 
databaseName) {
+        Catalog catalog = getCatalogOrError(catalogName);
+        try {
+            Set<String> connections = new 
HashSet<>(catalog.listConnections(databaseName));
+
+            // Add temporary connections for this catalog and database
+            temporaryConnections.keySet().stream()
+                    .filter(
+                            identifier ->
+                                    
identifier.getCatalogName().equals(catalogName)
+                                            && 
identifier.getDatabaseName().equals(databaseName))
+                    .map(ObjectIdentifier::getObjectName)
+                    .forEach(connections::add);
+
+            return connections;
+        } catch (DatabaseNotExistException e) {
+            throw new ValidationException(
+                    String.format(
+                            "Database '%s' does not exist in catalog '%s'.",
+                            databaseName, catalogName),
+                    e);
+        } catch (CatalogException e) {
+            throw new TableException(
+                    String.format(
+                            "Failed to list connections in catalog '%s' and 
database '%s'.",
+                            catalogName, databaseName),
+                    e);
+        }
+    }
+
+    /**
+     * Create a permanent connection in the given fully qualified path.
+     *
+     * <p>The {@link ConnectionFactory} is discovered from the connection's 
{@code type} option (see
+     * FLIP-529). If a {@link WritableSecretStore} is configured, sensitive 
fields are extracted
+     * from the connection and stored in the secret store before persisting 
the non-sensitive {@link
+     * CatalogConnection} to the catalog.
+     *
+     * @param connection The connection with all options including sensitive 
fields.
+     * @param objectIdentifier The fully qualified path where to create the 
connection.
+     * @param ignoreIfExists If false exception will be thrown if the 
connection already exists.
+     */
+    public void createConnection(
+            SensitiveConnection connection,
+            ObjectIdentifier objectIdentifier,
+            boolean ignoreIfExists) {
+        if (writableSecretStore == null) {
+            throw new ValidationException(
+                    "WritableSecretStore must be configured to create 
connections.");
+        }
+        if (getConnection(objectIdentifier).isPresent()) {
+            if (ignoreIfExists) {
+                return;
+            }
+            throw new ValidationException(
+                    String.format(
+                            "Connection with identifier '%s' already exists.",
+                            objectIdentifier.asSummaryString()));
+        }
+        final ConnectionFactory connectionFactory =
+                discoverConnectionFactory(connection.getOptions());
+        final CatalogConnection catalogConnection =
+                connectionFactory.createConnection(connection, 
writableSecretStore);
+        boolean persisted = false;
+        try {
+            execute(
+                    (catalog, path) -> {
+                        catalog.createConnection(path, catalogConnection, 
ignoreIfExists);
+                        catalogModificationListeners.forEach(
+                                listener ->
+                                        listener.onEvent(
+                                                
CreateConnectionEvent.createEvent(
+                                                        
CatalogContext.createContext(
+                                                                
objectIdentifier.getCatalogName(),
+                                                                catalog),
+                                                        objectIdentifier,
+                                                        catalogConnection,
+                                                        ignoreIfExists,
+                                                        false)));
+                    },
+                    objectIdentifier,
+                    ignoreIfExists,
+                    "CreateConnection");
+            persisted = true;
+        } finally {
+            if (!persisted) {
+                tryDeleteSecrets(
+                        catalogConnection,
+                        writableSecretStore,
+                        "rollback createConnection " + objectIdentifier);
+            }
+        }
+    }
+
+    /**
+     * Create a temporary connection in the given fully qualified path.
+     *
+     * @param connection The connection with all options including sensitive 
fields.
+     * @param objectIdentifier The fully qualified path where to create the 
connection.
+     * @param ignoreIfExists If false exception will be thrown if the 
connection already exists.
+     */
+    public void createTemporaryConnection(
+            SensitiveConnection connection,
+            ObjectIdentifier objectIdentifier,
+            boolean ignoreIfExists) {
+        if (temporaryConnections.containsKey(objectIdentifier)) {
+            if (ignoreIfExists) {
+                return;
+            }
+            throw new ValidationException(
+                    String.format("Temporary connection '%s' already exists.", 
objectIdentifier));
+        }
+        // Temporary connections are session-scoped; store secrets in an 
in-memory store rather
+        // than the configured (potentially persistent) writableSecretStore.
+        final ConnectionFactory connectionFactory =
+                discoverConnectionFactory(connection.getOptions());
+        final CatalogConnection catalogConnection =
+                connectionFactory.createConnection(connection, 
temporarySecretStore);
+        temporaryConnections.put(objectIdentifier, catalogConnection);
+        Catalog catalog = 
getCatalog(objectIdentifier.getCatalogName()).orElse(null);
+        catalogModificationListeners.forEach(
+                listener ->
+                        listener.onEvent(
+                                CreateConnectionEvent.createEvent(
+                                        CatalogContext.createContext(
+                                                
objectIdentifier.getCatalogName(), catalog),
+                                        objectIdentifier,
+                                        catalogConnection,
+                                        ignoreIfExists,
+                                        true)));
+    }
+
+    /**
+     * Alter a connection in the given fully qualified path.
+     *
+     * @param newConnection The new connection containing changes.
+     * @param objectIdentifier The fully qualified path where to alter the 
connection.
+     * @param ignoreIfNotExists If false exception will be thrown if the 
connection to be altered
+     *     does not exist.
+     */
+    public void alterConnection(
+            SensitiveConnection newConnection,
+            ObjectIdentifier objectIdentifier,
+            boolean ignoreIfNotExists) {
+        if (writableSecretStore == null) {
+            throw new ValidationException(
+                    "WritableSecretStore must be configured to alter 
connections.");
+        }
+        Optional<CatalogConnection> existingOpt = 
getConnection(objectIdentifier);
+        if (!existingOpt.isPresent()) {
+            if (ignoreIfNotExists) {
+                return;
+            }
+            throw new ValidationException(
+                    String.format(
+                            "Connection with identifier '%s' does not exist.",
+                            objectIdentifier.asSummaryString()));
+        }
+        final CatalogConnection existing = existingOpt.get();
+        final ConnectionFactory connectionFactory =
+                discoverConnectionFactory(newConnection.getOptions());
+        final CatalogConnection newCatalogConnection =
+                connectionFactory.createConnection(newConnection, 
writableSecretStore);
+        boolean persisted = false;
+        try {
+            execute(
+                    (catalog, path) -> {
+                        catalog.alterConnection(path, newCatalogConnection, 
ignoreIfNotExists);
+                        catalogModificationListeners.forEach(
+                                listener ->
+                                        listener.onEvent(
+                                                
AlterConnectionEvent.createEvent(
+                                                        
CatalogContext.createContext(
+                                                                
objectIdentifier.getCatalogName(),
+                                                                catalog),
+                                                        objectIdentifier,
+                                                        newCatalogConnection,
+                                                        ignoreIfNotExists)));
+                    },
+                    objectIdentifier,
+                    ignoreIfNotExists,
+                    "AlterConnection");
+            persisted = true;
+        } finally {
+            // On success: drop the OLD secret. On failure: drop the 
freshly-stored NEW secret.
+            tryDeleteSecrets(
+                    persisted ? existing : newCatalogConnection,
+                    writableSecretStore,
+                    persisted
+                            ? "post-alter cleanup of old secret for " + 
objectIdentifier
+                            : "rollback alterConnection " + objectIdentifier);
+        }
+    }
+
+    /**
+     * Drop a permanent connection from the given fully qualified path.
+     *
+     * @param objectIdentifier The fully qualified path of the connection to 
be dropped.
+     * @param ignoreIfNotExists If false exception will be thrown if the 
connection to be dropped
+     *     does not exist.
+     */
+    public void dropConnection(ObjectIdentifier objectIdentifier, boolean 
ignoreIfNotExists) {
+        Optional<CatalogConnection> existingOpt = 
getConnection(objectIdentifier);
+        if (!existingOpt.isPresent()) {
+            if (ignoreIfNotExists) {
+                return;
+            }
+            throw new ValidationException(
+                    String.format(
+                            "Connection with identifier '%s' does not exist.",
+                            objectIdentifier.asSummaryString()));
+        }
+        final CatalogConnection existing = existingOpt.get();
+        execute(
+                (catalog, path) -> {
+                    catalog.dropConnection(path, ignoreIfNotExists);
+                    catalogModificationListeners.forEach(
+                            listener ->
+                                    listener.onEvent(
+                                            DropConnectionEvent.createEvent(
+                                                    
CatalogContext.createContext(
+                                                            
objectIdentifier.getCatalogName(),
+                                                            catalog),
+                                                    objectIdentifier,
+                                                    existing,
+                                                    ignoreIfNotExists,
+                                                    false)));
+                },
+                objectIdentifier,
+                ignoreIfNotExists,
+                "DropConnection");
+        if (writableSecretStore != null) {
+            tryDeleteSecrets(
+                    existing, writableSecretStore, "post-drop cleanup for " + 
objectIdentifier);
+        }
+    }
+
+    /**
+     * Drop a temporary connection from the given fully qualified path.
+     *
+     * @param objectIdentifier The fully qualified path of the connection to 
be dropped.
+     * @param ignoreIfNotExists If false exception will be thrown if the 
connection to be dropped
+     *     does not exist.
+     */
+    public void dropTemporaryConnection(
+            ObjectIdentifier objectIdentifier, boolean ignoreIfNotExists) {
+        CatalogConnection connection = 
temporaryConnections.get(objectIdentifier);
+        if (connection != null) {
+            temporaryConnections.remove(objectIdentifier);
+            Catalog catalog = 
getCatalog(objectIdentifier.getCatalogName()).orElse(null);
+            catalogModificationListeners.forEach(
+                    listener ->
+                            listener.onEvent(
+                                    DropConnectionEvent.createEvent(
+                                            CatalogContext.createContext(
+                                                    
objectIdentifier.getCatalogName(), catalog),
+                                            objectIdentifier,
+                                            connection,
+                                            ignoreIfNotExists,
+                                            true)));
+            tryDeleteSecrets(
+                    connection,
+                    temporarySecretStore,
+                    "post-drop cleanup of temporary " + objectIdentifier);
+        } else if (!ignoreIfNotExists) {
+            throw new ValidationException(
+                    String.format(
+                            "Temporary connection with identifier '%s' does 
not exist.",
+                            objectIdentifier.asSummaryString()));
+        }
+    }
+
+    /**
+     * Best-effort cleanup of a connection's secrets. The catalog state has 
already been mutated (or
+     * failed); a cleanup failure should not mask the user-visible result. 
Logs the failure (which
+     * may indicate an orphaned secret in the underlying store) and swallows 
the exception.
+     */
+    private void tryDeleteSecrets(
+            CatalogConnection connection, WritableSecretStore store, String 
context) {
+        try {
+            
discoverConnectionFactory(connection.getOptions()).deleteSecrets(connection, 
store);
+        } catch (SecretException | ValidationException e) {
+            LOG.warn(
+                    "Failed to delete connection secrets during {}; the 
catalog state is correct, but the secret may be orphaned in the secret store.",
+                    context,
+                    e);
+        }
+    }
+
     /**
      * A command that modifies given {@link Catalog} in an {@link ObjectPath}. 
This unifies error
      * handling across different commands.
@@ -1813,6 +2190,8 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
                     | TableNotExistException
                     | ModelNotExistException
                     | ModelAlreadyExistException
+                    | ConnectionNotExistException
+                    | ConnectionAlreadyExistException
                     | DatabaseNotExistException e) {
                 throw new 
ValidationException(getErrorMessage(objectIdentifier, commandName), e);
             } catch (Exception e) {
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
index 15c8f00dc91..bc678672d91 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.catalog;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
+import 
org.apache.flink.table.catalog.exceptions.ConnectionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.ConnectionNotExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
@@ -59,6 +61,7 @@ public class GenericInMemoryCatalog extends AbstractCatalog {
     private final Map<String, CatalogDatabase> databases;
     private final Map<ObjectPath, CatalogBaseTable> tables;
     private final Map<ObjectPath, CatalogModel> models;
+    private final Map<ObjectPath, CatalogConnection> connections;
     private final Map<ObjectPath, CatalogFunction> functions;
     private final Map<ObjectPath, Map<CatalogPartitionSpec, CatalogPartition>> 
partitions;
 
@@ -79,6 +82,7 @@ public class GenericInMemoryCatalog extends AbstractCatalog {
         this.databases.put(defaultDatabase, new CatalogDatabaseImpl(new 
HashMap<>(), null));
         this.tables = new LinkedHashMap<>();
         this.models = new LinkedHashMap<>();
+        this.connections = new LinkedHashMap<>();
         this.functions = new LinkedHashMap<>();
         this.partitions = new LinkedHashMap<>();
         this.tableStats = new LinkedHashMap<>();
@@ -453,6 +457,78 @@ public class GenericInMemoryCatalog extends 
AbstractCatalog {
         return databaseExists(modelPath.getDatabaseName()) && 
models.containsKey(modelPath);
     }
 
+    // ------ connections ------
+
+    @Override
+    public void createConnection(
+            ObjectPath connectionPath, CatalogConnection connection, boolean 
ignoreIfExists)
+            throws ConnectionAlreadyExistException, DatabaseNotExistException {
+        checkNotNull(connectionPath);
+        checkNotNull(connection);
+        if (!databaseExists(connectionPath.getDatabaseName())) {
+            throw new DatabaseNotExistException(getName(), 
connectionPath.getDatabaseName());
+        }
+        if (connectionExists(connectionPath)) {
+            if (!ignoreIfExists) {
+                throw new ConnectionAlreadyExistException(getName(), 
connectionPath);
+            }
+        } else {
+            connections.put(connectionPath, connection.copy());
+        }
+    }
+
+    @Override
+    public void alterConnection(
+            ObjectPath connectionPath, CatalogConnection newConnection, 
boolean ignoreIfNotExists)
+            throws ConnectionNotExistException {
+        checkNotNull(connectionPath);
+        checkNotNull(newConnection);
+
+        if (!connectionExists(connectionPath)) {
+            if (ignoreIfNotExists) {
+                return;
+            }
+            throw new ConnectionNotExistException(getName(), connectionPath);
+        }
+
+        connections.put(connectionPath, newConnection.copy());
+    }
+
+    @Override
+    public void dropConnection(ObjectPath connectionPath, boolean 
ignoreIfNotExists)
+            throws ConnectionNotExistException {
+        checkNotNull(connectionPath);
+        if (connectionExists(connectionPath)) {
+            connections.remove(connectionPath);
+        } else if (!ignoreIfNotExists) {
+            throw new ConnectionNotExistException(getName(), connectionPath);
+        }
+    }
+
+    @Override
+    public List<String> listConnections(String databaseName) throws 
DatabaseNotExistException {
+        return listObjectsUnderDatabase(connections, databaseName, k -> true);
+    }
+
+    @Override
+    public CatalogConnection getConnection(ObjectPath connectionPath)
+            throws ConnectionNotExistException {
+        checkNotNull(connectionPath);
+
+        if (!connectionExists(connectionPath)) {
+            throw new ConnectionNotExistException(getName(), connectionPath);
+        } else {
+            return connections.get(connectionPath).copy();
+        }
+    }
+
+    @Override
+    public boolean connectionExists(ObjectPath connectionPath) {
+        checkNotNull(connectionPath);
+        return databaseExists(connectionPath.getDatabaseName())
+                && connections.containsKey(connectionPath);
+    }
+
     // ------ functions ------
 
     @Override
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/AlterConnectionEvent.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/AlterConnectionEvent.java
new file mode 100644
index 00000000000..475a877386f
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/AlterConnectionEvent.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.listener;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.CatalogConnection;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+/** When a connection is altered, a {@link AlterConnectionEvent} event will be 
created and fired. */
+@PublicEvolving
+public interface AlterConnectionEvent extends ConnectionModificationEvent {
+    ObjectIdentifier identifier();
+
+    CatalogConnection newConnection();
+
+    boolean ignoreIfNotExists();
+
+    static AlterConnectionEvent createEvent(
+            final CatalogContext context,
+            final ObjectIdentifier identifier,
+            final CatalogConnection newConnection,
+            final boolean ignoreIfNotExists) {
+        return new AlterConnectionEvent() {
+            @Override
+            public CatalogConnection newConnection() {
+                return newConnection;
+            }
+
+            @Override
+            public boolean ignoreIfNotExists() {
+                return ignoreIfNotExists;
+            }
+
+            @Override
+            public ObjectIdentifier identifier() {
+                return identifier;
+            }
+
+            @Override
+            public CatalogConnection connection() {
+                throw new IllegalStateException(
+                        "There is no connection in AlterConnectionEvent, use 
identifier() instead.");
+            }
+
+            @Override
+            public boolean isTemporary() {
+                return false;
+            }
+
+            @Override
+            public CatalogContext context() {
+                return context;
+            }
+        };
+    }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/ConnectionModificationEvent.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/ConnectionModificationEvent.java
new file mode 100644
index 00000000000..98609622f88
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/ConnectionModificationEvent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.listener;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.CatalogConnection;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+/** Basic event for connection modification such as create, alter and drop. */
+@PublicEvolving
+public interface ConnectionModificationEvent extends CatalogModificationEvent {
+
+    ObjectIdentifier identifier();
+
+    CatalogConnection connection();
+
+    boolean isTemporary();
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/CreateConnectionEvent.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/CreateConnectionEvent.java
new file mode 100644
index 00000000000..21a5297132b
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/CreateConnectionEvent.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.listener;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.CatalogConnection;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+/**
+ * When a connection is created, a {@link CreateConnectionEvent} event will be 
created and fired.
+ */
+@PublicEvolving
+public interface CreateConnectionEvent extends ConnectionModificationEvent {
+    ObjectIdentifier identifier();
+
+    CatalogConnection connection();
+
+    boolean ignoreIfExists();
+
+    boolean isTemporary();
+
+    static CreateConnectionEvent createEvent(
+            final CatalogContext context,
+            final ObjectIdentifier identifier,
+            final CatalogConnection connection,
+            final boolean ignoreIfExists,
+            final boolean isTemporary) {
+        return new CreateConnectionEvent() {
+            @Override
+            public boolean ignoreIfExists() {
+                return ignoreIfExists;
+            }
+
+            @Override
+            public ObjectIdentifier identifier() {
+                return identifier;
+            }
+
+            @Override
+            public CatalogConnection connection() {
+                return connection;
+            }
+
+            @Override
+            public CatalogContext context() {
+                return context;
+            }
+
+            @Override
+            public boolean isTemporary() {
+                return isTemporary;
+            }
+        };
+    }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/DropConnectionEvent.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/DropConnectionEvent.java
new file mode 100644
index 00000000000..3affd30326e
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/DropConnectionEvent.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.listener;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.CatalogConnection;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+import javax.annotation.Nullable;
+
+/** When a connection is dropped, a {@link DropConnectionEvent} event will be 
created and fired. */
+@PublicEvolving
+public interface DropConnectionEvent extends ConnectionModificationEvent {
+    ObjectIdentifier identifier();
+
+    boolean ignoreIfNotExists();
+
+    boolean isTemporary();
+
+    CatalogConnection connection();
+
+    static DropConnectionEvent createEvent(
+            final CatalogContext context,
+            final ObjectIdentifier identifier,
+            @Nullable final CatalogConnection connection,
+            final boolean ignoreIfNotExists,
+            final boolean isTemporary) {
+        return new DropConnectionEvent() {
+            @Override
+            public boolean ignoreIfNotExists() {
+                return ignoreIfNotExists;
+            }
+
+            @Override
+            public ObjectIdentifier identifier() {
+                return identifier;
+            }
+
+            @Override
+            @Nullable
+            public CatalogConnection connection() {
+                return connection;
+            }
+
+            @Override
+            public CatalogContext context() {
+                return context;
+            }
+
+            @Override
+            public boolean isTemporary() {
+                return isTemporary;
+            }
+        };
+    }
+}
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java
index 4b79b18c9d1..907d03dfcf8 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java
@@ -22,17 +22,22 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.listener.AlterConnectionEvent;
 import org.apache.flink.table.catalog.listener.AlterDatabaseEvent;
 import org.apache.flink.table.catalog.listener.AlterModelEvent;
 import org.apache.flink.table.catalog.listener.AlterTableEvent;
 import org.apache.flink.table.catalog.listener.CatalogModificationEvent;
 import org.apache.flink.table.catalog.listener.CatalogModificationListener;
+import org.apache.flink.table.catalog.listener.CreateConnectionEvent;
 import org.apache.flink.table.catalog.listener.CreateDatabaseEvent;
 import org.apache.flink.table.catalog.listener.CreateModelEvent;
 import org.apache.flink.table.catalog.listener.CreateTableEvent;
+import org.apache.flink.table.catalog.listener.DropConnectionEvent;
 import org.apache.flink.table.catalog.listener.DropDatabaseEvent;
 import org.apache.flink.table.catalog.listener.DropModelEvent;
 import org.apache.flink.table.catalog.listener.DropTableEvent;
+import org.apache.flink.table.secret.GenericInMemorySecretStore;
+import org.apache.flink.table.secret.WritableSecretStore;
 import org.apache.flink.table.utils.CatalogManagerMocks;
 import org.apache.flink.table.utils.ExpressionResolverMocks;
 import org.apache.flink.table.utils.ParserMock;
@@ -365,6 +370,180 @@ class CatalogManagerTest {
         
assertThat(dropTemporaryEvent.identifier().getObjectName()).isEqualTo("model2");
     }
 
+    @Test
+    public void testConnectionModificationListener() throws Exception {
+        CompletableFuture<CreateConnectionEvent> createFuture = new 
CompletableFuture<>();
+        CompletableFuture<CreateConnectionEvent> createTemporaryFuture = new 
CompletableFuture<>();
+        CompletableFuture<AlterConnectionEvent> alterFuture = new 
CompletableFuture<>();
+        CompletableFuture<DropConnectionEvent> dropFuture = new 
CompletableFuture<>();
+        CompletableFuture<DropConnectionEvent> dropTemporaryFuture = new 
CompletableFuture<>();
+        WritableSecretStore secretStore = new GenericInMemorySecretStore();
+        CatalogManager catalogManager =
+                CatalogManagerMocks.preparedCatalogManager()
+                        .defaultCatalog("default", new 
GenericInMemoryCatalog("default"))
+                        .classLoader(CatalogManagerTest.class.getClassLoader())
+                        .config(new Configuration())
+                        .catalogModificationListeners(
+                                Collections.singletonList(
+                                        new 
TestingConnectionModificationListener(
+                                                createFuture,
+                                                createTemporaryFuture,
+                                                alterFuture,
+                                                dropFuture,
+                                                dropTemporaryFuture)))
+                        .catalogStoreHolder(
+                                CatalogStoreHolder.newBuilder()
+                                        
.classloader(CatalogManagerTest.class.getClassLoader())
+                                        .catalogStore(new 
GenericInMemoryCatalogStore())
+                                        .config(new Configuration())
+                                        .build())
+                        .writableSecretStore(secretStore)
+                        .build();
+
+        catalogManager.initSchemaResolver(
+                true, ExpressionResolverMocks.dummyResolver(), new 
ParserMock());
+
+        HashMap<String, String> options =
+                new HashMap<String, String>() {
+                    {
+                        put("type", "default");
+                        put("bootstrap.servers", "localhost:9092");
+                        put("password", "secret-pw");
+                    }
+                };
+
+        // Create a connection
+        catalogManager.createConnection(
+                SensitiveConnection.of(options, null),
+                ObjectIdentifier.of(
+                        catalogManager.getCurrentCatalog(),
+                        catalogManager.getCurrentDatabase(),
+                        "conn1"),
+                true);
+        CreateConnectionEvent createConnectionEvent = createFuture.get(10, 
TimeUnit.SECONDS);
+        
assertThat(createConnectionEvent.identifier().getObjectName()).isEqualTo("conn1");
+        assertThat(createConnectionEvent.ignoreIfExists()).isTrue();
+        assertThat(createConnectionEvent.isTemporary()).isFalse();
+        // Sensitive field should be stripped from the persisted 
CatalogConnection
+        
assertThat(createConnectionEvent.connection().getOptions()).doesNotContainKey("password");
+
+        // Create a temporary connection
+        catalogManager.createTemporaryConnection(
+                SensitiveConnection.of(options, null),
+                ObjectIdentifier.of(
+                        catalogManager.getCurrentCatalog(),
+                        catalogManager.getCurrentDatabase(),
+                        "conn2"),
+                false);
+        CreateConnectionEvent createTemporaryEvent =
+                createTemporaryFuture.get(10, TimeUnit.SECONDS);
+        assertThat(createTemporaryEvent.isTemporary()).isTrue();
+        
assertThat(createTemporaryEvent.identifier().getObjectName()).isEqualTo("conn2");
+        assertThat(createTemporaryEvent.ignoreIfExists()).isFalse();
+
+        // Alter a connection
+        HashMap<String, String> alteredOptions =
+                new HashMap<String, String>() {
+                    {
+                        put("type", "default");
+                        put("bootstrap.servers", "remote:9092");
+                        put("password", "rotated-pw");
+                    }
+                };
+        catalogManager.alterConnection(
+                SensitiveConnection.of(alteredOptions, "conn1 comment"),
+                ObjectIdentifier.of(
+                        catalogManager.getCurrentCatalog(),
+                        catalogManager.getCurrentDatabase(),
+                        "conn1"),
+                false);
+        AlterConnectionEvent alterEvent = alterFuture.get(10, 
TimeUnit.SECONDS);
+        assertThat(alterEvent.identifier().getObjectName()).isEqualTo("conn1");
+        assertThat(alterEvent.newConnection().getComment()).isEqualTo("conn1 
comment");
+        
assertThat(alterEvent.newConnection().getOptions().get("bootstrap.servers"))
+                .isEqualTo("remote:9092");
+        
assertThat(alterEvent.newConnection().getOptions()).doesNotContainKey("password");
+        assertThat(alterEvent.ignoreIfNotExists()).isFalse();
+
+        // Drop a connection
+        ObjectIdentifier oi =
+                ObjectIdentifier.of(
+                        catalogManager.getCurrentCatalog(),
+                        catalogManager.getCurrentDatabase(),
+                        "conn1");
+        catalogManager.dropConnection(oi, true);
+        DropConnectionEvent dropEvent = dropFuture.get(10, TimeUnit.SECONDS);
+        assertThat(dropEvent.ignoreIfNotExists()).isTrue();
+        assertThat(dropEvent.identifier().getObjectName()).isEqualTo("conn1");
+        assertThat(dropEvent.isTemporary()).isFalse();
+
+        // Drop a temporary connection
+        catalogManager.dropTemporaryConnection(
+                ObjectIdentifier.of(
+                        catalogManager.getCurrentCatalog(),
+                        catalogManager.getCurrentDatabase(),
+                        "conn2"),
+                false);
+        DropConnectionEvent dropTemporaryEvent = dropTemporaryFuture.get(10, 
TimeUnit.SECONDS);
+        assertThat(dropTemporaryEvent.isTemporary()).isTrue();
+        assertThat(dropTemporaryEvent.ignoreIfNotExists()).isFalse();
+        
assertThat(dropTemporaryEvent.identifier().getObjectName()).isEqualTo("conn2");
+    }
+
+    @Test
+    public void testCreateConnectionWithoutTypeFallsBackToDefaultFactory() 
throws Exception {
+        CompletableFuture<CreateConnectionEvent> createFuture = new 
CompletableFuture<>();
+        WritableSecretStore secretStore = new GenericInMemorySecretStore();
+        CatalogManager catalogManager =
+                CatalogManagerMocks.preparedCatalogManager()
+                        .defaultCatalog("default", new 
GenericInMemoryCatalog("default"))
+                        .classLoader(CatalogManagerTest.class.getClassLoader())
+                        .config(new Configuration())
+                        .catalogModificationListeners(
+                                Collections.singletonList(
+                                        new 
TestingConnectionModificationListener(
+                                                createFuture,
+                                                new CompletableFuture<>(),
+                                                new CompletableFuture<>(),
+                                                new CompletableFuture<>(),
+                                                new CompletableFuture<>())))
+                        .catalogStoreHolder(
+                                CatalogStoreHolder.newBuilder()
+                                        
.classloader(CatalogManagerTest.class.getClassLoader())
+                                        .catalogStore(new 
GenericInMemoryCatalogStore())
+                                        .config(new Configuration())
+                                        .build())
+                        .writableSecretStore(secretStore)
+                        .build();
+
+        catalogManager.initSchemaResolver(
+                true, ExpressionResolverMocks.dummyResolver(), new 
ParserMock());
+
+        // Omit the 'type' option entirely; discovery should fall back to 
DefaultConnectionFactory.
+        HashMap<String, String> options =
+                new HashMap<String, String>() {
+                    {
+                        put("bootstrap.servers", "localhost:9092");
+                        put("password", "secret-pw");
+                    }
+                };
+
+        catalogManager.createConnection(
+                SensitiveConnection.of(options, null),
+                ObjectIdentifier.of(
+                        catalogManager.getCurrentCatalog(),
+                        catalogManager.getCurrentDatabase(),
+                        "conn-no-type"),
+                false);
+
+        CreateConnectionEvent event = createFuture.get(10, TimeUnit.SECONDS);
+        
assertThat(event.identifier().getObjectName()).isEqualTo("conn-no-type");
+        // Sensitive field stripped — proves DefaultConnectionFactory ran via 
the fallback path.
+        
assertThat(event.connection().getOptions()).doesNotContainKey("password");
+        assertThat(event.connection().getOptions())
+                .containsEntry("bootstrap.servers", "localhost:9092");
+    }
+
     private CatalogManager createCatalogManager(@Nullable 
CatalogModificationListener listener) {
         CatalogManager.Builder builder =
                 CatalogManager.newBuilder()
@@ -457,6 +636,49 @@ class CatalogManagerTest {
         }
     }
 
+    /** Testing connection modification listener. */
+    static class TestingConnectionModificationListener implements 
CatalogModificationListener {
+        private final CompletableFuture<CreateConnectionEvent> createFuture;
+        private final CompletableFuture<CreateConnectionEvent> 
createTemporaryFuture;
+        private final CompletableFuture<AlterConnectionEvent> alterFuture;
+        private final CompletableFuture<DropConnectionEvent> dropFuture;
+        private final CompletableFuture<DropConnectionEvent> 
dropTemporaryFuture;
+
+        TestingConnectionModificationListener(
+                CompletableFuture<CreateConnectionEvent> createFuture,
+                CompletableFuture<CreateConnectionEvent> createTemporaryFuture,
+                CompletableFuture<AlterConnectionEvent> alterFuture,
+                CompletableFuture<DropConnectionEvent> dropFuture,
+                CompletableFuture<DropConnectionEvent> dropTemporaryFuture) {
+            this.createFuture = createFuture;
+            this.createTemporaryFuture = createTemporaryFuture;
+            this.alterFuture = alterFuture;
+            this.dropFuture = dropFuture;
+            this.dropTemporaryFuture = dropTemporaryFuture;
+        }
+
+        @Override
+        public void onEvent(CatalogModificationEvent event) {
+            if (event instanceof CreateConnectionEvent) {
+                if (((CreateConnectionEvent) event).isTemporary()) {
+                    createTemporaryFuture.complete((CreateConnectionEvent) 
event);
+                } else {
+                    createFuture.complete((CreateConnectionEvent) event);
+                }
+            } else if (event instanceof AlterConnectionEvent) {
+                alterFuture.complete((AlterConnectionEvent) event);
+            } else if (event instanceof DropConnectionEvent) {
+                if (((DropConnectionEvent) event).isTemporary()) {
+                    dropTemporaryFuture.complete((DropConnectionEvent) event);
+                } else {
+                    dropFuture.complete((DropConnectionEvent) event);
+                }
+            } else {
+                throw new UnsupportedOperationException();
+            }
+        }
+    }
+
     /** Testing model modification listener. */
     static class TestingModelModificationListener implements 
CatalogModificationListener {
         private final CompletableFuture<CreateModelEvent> createFuture;
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
index 52a7cd7ccb5..403d94459fb 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
@@ -255,6 +255,11 @@ class GenericInMemoryCatalogTest extends CatalogTestBase {
         return true;
     }
 
+    @Override
+    protected boolean supportsConnections() {
+        return true;
+    }
+
     @Override
     protected CatalogFunction createPythonFunction() {
         return new CatalogFunctionImpl("test.func1", FunctionLanguage.PYTHON);
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
index e230b616cdb..3db7d7cf2ee 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.catalog;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
+import 
org.apache.flink.table.catalog.exceptions.ConnectionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.ConnectionNotExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
@@ -932,4 +934,102 @@ public interface Catalog {
             throws ModelNotExistException, CatalogException {
         alterModel(modelPath, newModel, ignoreIfNotExists);
     }
+
+    // ------ connections ------
+
+    /**
+     * Get names of all connections under this database. An empty list is 
returned if none exists.
+     *
+     * @return a list of the names of all connections in this database
+     * @throws DatabaseNotExistException if the database does not exist
+     * @throws CatalogException in case of any runtime exception
+     */
+    default List<String> listConnections(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        return Collections.emptyList();
+    }
+
+    /**
+     * Returns a {@link CatalogConnection} identified by the given {@link 
ObjectPath}.
+     *
+     * @param connectionPath Path of the connection
+     * @return The requested connection
+     * @throws ConnectionNotExistException if the target does not exist
+     * @throws CatalogException in case of any runtime exception
+     */
+    default CatalogConnection getConnection(ObjectPath connectionPath)
+            throws ConnectionNotExistException, CatalogException {
+        throw new UnsupportedOperationException(
+                String.format(
+                        "getConnection(ObjectPath) is not implemented for 
%s.", this.getClass()));
+    }
+
+    /**
+     * Check if a connection exists in this catalog.
+     *
+     * @param connectionPath Path of the connection
+     * @return true if the given connection exists in the catalog false 
otherwise
+     * @throws CatalogException in case of any runtime exception
+     */
+    default boolean connectionExists(ObjectPath connectionPath) throws 
CatalogException {
+        return false;
+    }
+
+    /**
+     * Creates a new connection.
+     *
+     * @param connectionPath path of the connection to be created
+     * @param connection the CatalogConnection definition
+     * @param ignoreIfExists flag to specify behavior when a connection 
already exists at the given
+     *     path: if set to false, it throws a ConnectionAlreadyExistException, 
if set to true, do
+     *     nothing.
+     * @throws ConnectionAlreadyExistException if connection already exists 
and ignoreIfExists is
+     *     false
+     * @throws DatabaseNotExistException if the database in connectionPath 
doesn't exist
+     * @throws CatalogException in case of any runtime exception
+     */
+    default void createConnection(
+            ObjectPath connectionPath, CatalogConnection connection, boolean 
ignoreIfExists)
+            throws ConnectionAlreadyExistException, DatabaseNotExistException, 
CatalogException {
+        throw new UnsupportedOperationException(
+                String.format(
+                        "createConnection(ObjectPath, CatalogConnection, 
boolean) is not implemented for %s.",
+                        this.getClass()));
+    }
+
+    /**
+     * Modifies an existing connection.
+     *
+     * @param connectionPath path of the connection to be modified
+     * @param newConnection the new connection definition
+     * @param ignoreIfNotExists flag to specify behavior when the connection 
does not exist: if set
+     *     to false, throw an exception, if set to true, do nothing.
+     * @throws ConnectionNotExistException if the connection does not exist
+     * @throws CatalogException in case of any runtime exception
+     */
+    default void alterConnection(
+            ObjectPath connectionPath, CatalogConnection newConnection, 
boolean ignoreIfNotExists)
+            throws ConnectionNotExistException, CatalogException {
+        throw new UnsupportedOperationException(
+                String.format(
+                        "alterConnection(ObjectPath, CatalogConnection, 
boolean) is not implemented for %s.",
+                        this.getClass()));
+    }
+
+    /**
+     * Drop a connection.
+     *
+     * @param connectionPath Path of the connection to be dropped
+     * @param ignoreIfNotExists Flag to specify behavior when the connection 
does not exist: if set
+     *     to false, throw an exception, if set to true, do nothing.
+     * @throws ConnectionNotExistException if the connection does not exist
+     * @throws CatalogException in case of any runtime exception
+     */
+    default void dropConnection(ObjectPath connectionPath, boolean 
ignoreIfNotExists)
+            throws ConnectionNotExistException, CatalogException {
+        throw new UnsupportedOperationException(
+                String.format(
+                        "dropConnection(ObjectPath, boolean) is not 
implemented for %s.",
+                        this.getClass()));
+    }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/ConnectionAlreadyExistException.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/ConnectionAlreadyExistException.java
new file mode 100644
index 00000000000..5732f470f80
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/ConnectionAlreadyExistException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.exceptions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.ObjectPath;
+
+/** Exception for trying to create a connection that already exists. */
+@PublicEvolving
+public class ConnectionAlreadyExistException extends Exception {
+
+    private static final String MSG = "Connection '%s' already exists in 
catalog '%s'.";
+
+    public ConnectionAlreadyExistException(String catalogName, ObjectPath 
connectionPath) {
+        this(catalogName, connectionPath, null);
+    }
+
+    public ConnectionAlreadyExistException(
+            String catalogName, ObjectPath connectionPath, Throwable cause) {
+        super(String.format(MSG, connectionPath.getFullName(), catalogName), 
cause);
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/ConnectionNotExistException.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/ConnectionNotExistException.java
new file mode 100644
index 00000000000..72544f55173
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/ConnectionNotExistException.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.exceptions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.ObjectPath;
+
+/** Exception for trying to operate on a connection that doesn't exist. */
+@PublicEvolving
+public class ConnectionNotExistException extends Exception {
+
+    private static final String MSG = "Connection '%s' does not exist in 
catalog '%s'.";
+    private static final String MSG_WITHOUT_CATALOG = "Connection '%s' does 
not exist.";
+
+    public ConnectionNotExistException(String catalogName, ObjectPath 
connectionPath) {
+        this(catalogName, connectionPath, null);
+    }
+
+    public ConnectionNotExistException(
+            String catalogName, ObjectPath connectionPath, Throwable cause) {
+        super(formatMsg(catalogName, connectionPath), cause);
+    }
+
+    private static String formatMsg(String catalogName, ObjectPath 
connectionPath) {
+        if (catalogName != null) {
+            return String.format(MSG, connectionPath.getFullName(), 
catalogName);
+        }
+        return String.format(MSG_WITHOUT_CATALOG, 
connectionPath.getFullName());
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ConnectionFactory.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ConnectionFactory.java
new file mode 100644
index 00000000000..25d0bf3c2b8
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ConnectionFactory.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.CatalogConnection;
+import org.apache.flink.table.catalog.SensitiveConnection;
+import org.apache.flink.table.secret.ReadableSecretStore;
+import org.apache.flink.table.secret.WritableSecretStore;
+import org.apache.flink.table.secret.exceptions.SecretException;
+
+/**
+ * Factory for creating and resolving connections, handling the encryption and 
decryption of
+ * sensitive connection fields.
+ *
+ * <p>A {@code ConnectionFactory} is responsible for:
+ *
+ * <ul>
+ *   <li>Extracting sensitive fields from a {@link SensitiveConnection} and 
storing them in a {@link
+ *       WritableSecretStore}, returning a {@link CatalogConnection} that is 
safe to persist in a
+ *       catalog.
+ *   <li>Resolving a {@link CatalogConnection} from a catalog by retrieving 
its secrets from a
+ *       {@link ReadableSecretStore} and returning a complete {@link 
SensitiveConnection}.
+ * </ul>
+ *
+ * @see DefaultConnectionFactory
+ */
+@PublicEvolving
+public interface ConnectionFactory extends Factory {
+
+    /**
+     * Creates a {@link CatalogConnection} from a {@link SensitiveConnection} 
by extracting
+     * sensitive fields and storing them in the provided {@link 
WritableSecretStore}.
+     *
+     * <p>The returned {@link CatalogConnection} contains only non-sensitive 
options plus a secret
+     * reference that can be used to retrieve the sensitive fields later via 
{@link
+     * #resolveConnection(CatalogConnection, ReadableSecretStore)}.
+     *
+     * @param connection the connection with all options including sensitive 
fields
+     * @param secretStore the secret store where sensitive fields will be 
stored
+     * @return a catalog-safe connection with sensitive fields replaced by a 
secret reference
+     * @throws SecretException if storing the secret fails (e.g. 
underlying-store error)
+     */
+    CatalogConnection createConnection(
+            SensitiveConnection connection, WritableSecretStore secretStore) 
throws SecretException;
+
+    /**
+     * Resolves a {@link CatalogConnection} into a {@link SensitiveConnection} 
by retrieving secrets
+     * from the provided {@link ReadableSecretStore}.
+     *
+     * @param connection the catalog connection containing non-sensitive 
options and a secret
+     *     reference
+     * @param secretStore the secret store from which sensitive fields are 
retrieved
+     * @return the complete connection with all options including sensitive 
fields
+     * @throws SecretException if retrieving the secret fails (e.g. 
underlying-store error)
+     */
+    SensitiveConnection resolveConnection(
+            CatalogConnection connection, ReadableSecretStore secretStore) 
throws SecretException;
+
+    /**
+     * Deletes any secrets associated with the given {@link CatalogConnection} 
from the provided
+     * {@link WritableSecretStore}.
+     *
+     * <p>Implementations should locate the secret reference embedded in the 
connection (created by
+     * {@link #createConnection(SensitiveConnection, WritableSecretStore)}) 
and remove the
+     * corresponding entry from the secret store. This is intended to be 
called when a connection is
+     * dropped or replaced (e.g. on alter), to avoid orphaned secrets.
+     *
+     * <p>The default implementation is a no-op for factories that do not 
externalize secrets.
+     *
+     * @param connection the catalog connection whose backing secrets should 
be removed
+     * @param secretStore the secret store from which secrets should be deleted
+     * @throws SecretException if removing the secret fails (e.g. 
underlying-store error)
+     */
+    default void deleteSecrets(CatalogConnection connection, 
WritableSecretStore secretStore)
+            throws SecretException {}
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DefaultConnectionFactory.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DefaultConnectionFactory.java
new file mode 100644
index 00000000000..236e8c6a5f4
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DefaultConnectionFactory.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogConnection;
+import org.apache.flink.table.catalog.SensitiveConnection;
+import org.apache.flink.table.secret.ReadableSecretStore;
+import org.apache.flink.table.secret.WritableSecretStore;
+import org.apache.flink.table.secret.exceptions.SecretException;
+import org.apache.flink.table.secret.exceptions.SecretNotFoundException;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Default implementation of {@link ConnectionFactory} that identifies 
sensitive fields by a
+ * predefined whitelist of field names.
+ *
+ * <p>During {@link #createConnection}, sensitive fields are extracted from 
the connection options
+ * and stored as a single secret in the {@link WritableSecretStore}. A 
reference key ({@value
+ * #SECRET_REFERENCE_KEY}) pointing to the stored secret is added to the 
returned {@link
+ * CatalogConnection}.
+ *
+ * <p>During {@link #resolveConnection}, the secret reference is used to 
retrieve the sensitive
+ * fields from the {@link ReadableSecretStore} and merge them back into the 
options.
+ *
+ * <p>See {@link #SENSITIVE_FIELD_NAMES} for the default whitelist of 
sensitive field names.
+ */
+@Internal
+public class DefaultConnectionFactory implements ConnectionFactory {
+
+    /**
+     * Factory identifier used to discover this factory via SPI. Also used as 
the fallback when a
+     * connection does not specify a {@link FactoryUtil#CONNECTION_TYPE} 
option.
+     */
+    public static final String IDENTIFIER = "default";
+
+    /**
+     * Reserved option key used to store the reference to secrets in the 
secret store. The
+     * surrounding double underscores make collision with user-supplied option 
names unlikely; user
+     * options containing this key will be rejected at create-time.
+     */
+    static final String SECRET_REFERENCE_KEY = 
"__flink.encrypted-secret-key__";
+
+    /**
+     * Default whitelist of option keys treated as sensitive. Seeded from 
{@link
+     * org.apache.flink.configuration.GlobalConfiguration#SENSITIVE_KEYS}; the 
list is intentionally
+     * small to start and can be expanded over time as new sensitive options 
are introduced.
+     */
+    private static final Set<String> SENSITIVE_FIELD_NAMES =
+            Collections.unmodifiableSet(
+                    new HashSet<>(
+                            Arrays.asList(
+                                    "password",
+                                    "secret",
+                                    "fs.azure.account.key",
+                                    "apikey",
+                                    "api-key",
+                                    "auth-params",
+                                    "service-key",
+                                    "token",
+                                    "basic-auth",
+                                    "jaas.config",
+                                    "http-headers")));
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public CatalogConnection createConnection(
+            SensitiveConnection connection, WritableSecretStore secretStore) {
+        Map<String, String> allOptions = connection.getOptions();
+
+        if (allOptions.containsKey(SECRET_REFERENCE_KEY)) {
+            throw new ValidationException(
+                    String.format(
+                            "Connection option '%s' is reserved and cannot be 
set by users.",
+                            SECRET_REFERENCE_KEY));
+        }
+
+        Map<String, String> sensitiveOptions =
+                allOptions.entrySet().stream()
+                        .filter(e -> 
SENSITIVE_FIELD_NAMES.contains(e.getKey()))
+                        .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+        Map<String, String> nonSensitiveOptions =
+                allOptions.entrySet().stream()
+                        .filter(e -> 
!SENSITIVE_FIELD_NAMES.contains(e.getKey()))
+                        .collect(
+                                Collectors.toMap(
+                                        Map.Entry::getKey,
+                                        Map.Entry::getValue,
+                                        (a, b) -> a,
+                                        HashMap::new));
+
+        if (!sensitiveOptions.isEmpty()) {
+            final String secretId;
+            try {
+                secretId = secretStore.storeSecret(sensitiveOptions);
+            } catch (SecretException e) {
+                throw e;
+            } catch (RuntimeException e) {
+                throw new SecretException("Failed to store connection 
secret.", e);
+            }
+            nonSensitiveOptions.put(SECRET_REFERENCE_KEY, secretId);
+        }
+
+        return CatalogConnection.of(nonSensitiveOptions, 
connection.getComment());
+    }
+
+    @Override
+    public SensitiveConnection resolveConnection(
+            CatalogConnection connection, ReadableSecretStore secretStore) {
+        Map<String, String> options = new HashMap<>(connection.getOptions());
+
+        String secretId = options.remove(SECRET_REFERENCE_KEY);
+        if (secretId != null) {
+            try {
+                Map<String, String> secrets = secretStore.getSecret(secretId);
+                options.putAll(secrets);
+            } catch (SecretNotFoundException e) {
+                throw new ValidationException(
+                        String.format(
+                                "Failed to resolve connection secrets. Secret 
with ID '%s' not found.",
+                                secretId),
+                        e);
+            } catch (SecretException e) {
+                throw e;
+            } catch (RuntimeException e) {
+                throw new SecretException(
+                        String.format(
+                                "Failed to retrieve connection secret with ID 
'%s'.", secretId),
+                        e);
+            }
+        }
+
+        return SensitiveConnection.of(options, connection.getComment());
+    }
+
+    @Override
+    public void deleteSecrets(CatalogConnection connection, 
WritableSecretStore secretStore) {
+        String secretId = connection.getOptions().get(SECRET_REFERENCE_KEY);
+        if (secretId != null) {
+            try {
+                secretStore.removeSecret(secretId);
+            } catch (SecretException e) {
+                throw e;
+            } catch (RuntimeException e) {
+                throw new SecretException(
+                        String.format("Failed to remove connection secret with 
ID '%s'.", secretId),
+                        e);
+            }
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
index a61da0902ae..c4ecf751c6d 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
@@ -107,6 +107,14 @@ public final class FactoryUtil {
                             "Uniquely identifies the provider of a model that 
is used for model inference."
                                     + " Its value is used during model 
provider discovery.");
 
+    public static final ConfigOption<String> CONNECTION_TYPE =
+            ConfigOptions.key("type")
+                    .stringType()
+                    .defaultValue(DefaultConnectionFactory.IDENTIFIER)
+                    .withDescription(
+                            "Identifies the type of a connection. Its value is 
used during"
+                                    + " ConnectionFactory discovery.");
+
     public static final ConfigOption<String> FORMAT =
             ConfigOptions.key("format")
                     .stringType()
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/ReadableSecretStore.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/ReadableSecretStore.java
index 70dd48e21f9..1cf7120abed 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/ReadableSecretStore.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/ReadableSecretStore.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.secret;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.secret.exceptions.SecretException;
 import org.apache.flink.table.secret.exceptions.SecretNotFoundException;
 
 import java.util.Map;
@@ -40,6 +41,8 @@ public interface ReadableSecretStore extends SecretStore {
      * @param secretId the unique identifier of the secret to retrieve
      * @return a map containing the secret data as key-value pairs
      * @throws SecretNotFoundException if the secret with the given identifier 
does not exist
+     * @throws SecretException if the operation fails due to underlying-store 
errors (network,
+     *     permission, etc.)
      */
-    Map<String, String> getSecret(String secretId) throws 
SecretNotFoundException;
+    Map<String, String> getSecret(String secretId) throws 
SecretNotFoundException, SecretException;
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/WritableSecretStore.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/WritableSecretStore.java
index db5037b7b2f..11167d861a6 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/WritableSecretStore.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/WritableSecretStore.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.secret;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.secret.exceptions.SecretException;
 import org.apache.flink.table.secret.exceptions.SecretNotFoundException;
 
 import java.util.Map;
@@ -40,15 +41,19 @@ public interface WritableSecretStore extends SecretStore {
      *
      * @param secretData a map containing the secret data as key-value pairs 
to be stored
      * @return a unique identifier for the stored secret
+     * @throws SecretException if the operation fails due to underlying-store 
errors (network,
+     *     permission, quota, etc.)
      */
-    String storeSecret(Map<String, String> secretData);
+    String storeSecret(Map<String, String> secretData) throws SecretException;
 
     /**
      * Removes a secret from the secret store.
      *
      * @param secretId the unique identifier of the secret to remove
+     * @throws SecretException if the operation fails due to underlying-store 
errors (network,
+     *     permission, etc.)
      */
-    void removeSecret(String secretId);
+    void removeSecret(String secretId) throws SecretException;
 
     /**
      * Atomically updates an existing secret with new data.
@@ -58,7 +63,9 @@ public interface WritableSecretStore extends SecretStore {
      * @param secretId the unique identifier of the secret to update
      * @param newSecretData a map containing the new secret data as key-value 
pairs
      * @throws SecretNotFoundException if the secret with the given identifier 
does not exist
+     * @throws SecretException if the operation fails due to underlying-store 
errors (network,
+     *     permission, etc.)
      */
     void updateSecret(String secretId, Map<String, String> newSecretData)
-            throws SecretNotFoundException;
+            throws SecretNotFoundException, SecretException;
 }
diff --git 
a/flink-table/flink-table-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/flink-table/flink-table-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 456c161e7dd..ec8fc758b35 100644
--- 
a/flink-table/flink-table-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ 
b/flink-table/flink-table-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -14,3 +14,4 @@
 # limitations under the License.
 
 org.apache.flink.table.module.CoreModuleFactory
+org.apache.flink.table.factories.DefaultConnectionFactory
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
index 228b5485f01..f93e9f0575b 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.table.catalog;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
+import 
org.apache.flink.table.catalog.exceptions.ConnectionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.ConnectionNotExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
@@ -68,12 +70,16 @@ public abstract class CatalogTest {
     protected final String t3 = "t3";
     protected final String m1 = "m1";
     protected final String m2 = "m2";
+    protected final String c1 = "c1";
+    protected final String c2 = "c2";
     protected final ObjectPath path1 = new ObjectPath(db1, t1);
     protected final ObjectPath path2 = new ObjectPath(db2, t2);
     protected final ObjectPath path3 = new ObjectPath(db1, t2);
     protected final ObjectPath path4 = new ObjectPath(db1, t3);
     protected final ObjectPath modelPath1 = new ObjectPath(db1, m1);
     protected final ObjectPath modelPath2 = new ObjectPath(db1, m2);
+    protected final ObjectPath connectionPath1 = new ObjectPath(db1, c1);
+    protected final ObjectPath connectionPath2 = new ObjectPath(db1, c2);
     protected final ObjectPath nonExistDbPath = 
ObjectPath.fromString("non.exist");
     protected final ObjectPath nonExistObjectPath = 
ObjectPath.fromString("db1.nonexist");
 
@@ -108,6 +114,14 @@ public abstract class CatalogTest {
                 catalog.dropModel(modelPath2, true);
             }
         }
+        if (supportsConnections()) {
+            if (catalog.connectionExists(connectionPath1)) {
+                catalog.dropConnection(connectionPath1, true);
+            }
+            if (catalog.connectionExists(connectionPath2)) {
+                catalog.dropConnection(connectionPath2, true);
+            }
+        }
 
         // Delete db last so that other resources can be found and dropped
         if (catalog.databaseExists(db1)) {
@@ -463,6 +477,201 @@ public abstract class CatalogTest {
         catalog.dropModel(modelPath1, true);
     }
 
+    // ------ connections ------
+    @Test
+    public void testCreateConnection() throws Exception {
+        if (!supportsConnections()) {
+            return;
+        }
+        catalog.createDatabase(db1, createDb(), false);
+        CatalogConnection connection = createConnection();
+        catalog.createConnection(connectionPath1, connection, false);
+
+        List<String> connections = catalog.listConnections(db1);
+        assertThat(connections).isEqualTo(Collections.singletonList(c1));
+    }
+
+    @Test
+    public void testCreateConnection_DatabaseNotExistException() {
+        if (!supportsConnections()) {
+            return;
+        }
+        assertThat(catalog.databaseExists(db1)).isFalse();
+
+        assertThatThrownBy(
+                        () ->
+                                catalog.createConnection(
+                                        nonExistObjectPath, 
createConnection(), false))
+                .isInstanceOf(DatabaseNotExistException.class)
+                .hasMessage("Database db1 does not exist in Catalog " + 
TEST_CATALOG_NAME + ".");
+    }
+
+    @Test
+    public void testCreateConnection_ConnectionAlreadyExistException() throws 
Exception {
+        if (!supportsConnections()) {
+            return;
+        }
+        catalog.createDatabase(db1, createDb(), false);
+        catalog.createConnection(connectionPath1, createConnection(), false);
+
+        assertThatThrownBy(
+                        () -> catalog.createConnection(connectionPath1, 
createConnection(), false))
+                .isInstanceOf(ConnectionAlreadyExistException.class)
+                .hasMessage(
+                        "Connection 'db1.c1' already exists in catalog '"
+                                + TEST_CATALOG_NAME
+                                + "'.");
+    }
+
+    @Test
+    public void testCreateConnection_ConnectionAlreadyExist_ignored() throws 
Exception {
+        if (!supportsConnections()) {
+            return;
+        }
+        catalog.createDatabase(db1, createDb(), false);
+
+        CatalogConnection connection = createConnection();
+        catalog.createConnection(connectionPath1, connection, false);
+        catalog.createConnection(connectionPath1, connection, true);
+
+        List<String> connections = catalog.listConnections(db1);
+        assertThat(connections).isEqualTo(Collections.singletonList(c1));
+    }
+
+    @Test
+    public void testListConnections() throws Exception {
+        if (!supportsConnections()) {
+            return;
+        }
+        catalog.createDatabase(db1, createDb(), false);
+
+        catalog.createConnection(connectionPath1, createConnection(), false);
+        catalog.createConnection(connectionPath2, createConnection(), false);
+
+        assertThat(catalog.listConnections(db1)).isEqualTo(Arrays.asList(c1, 
c2));
+    }
+
+    @Test
+    public void testGetConnection() throws Exception {
+        if (!supportsConnections()) {
+            return;
+        }
+        catalog.createDatabase(db1, createDb(), false);
+        catalog.createConnection(connectionPath1, createConnection(), false);
+        assertThat(catalog.getConnection(connectionPath1)).isNotNull();
+    }
+
+    @Test
+    public void testGetConnection_ConnectionNotExistException() throws 
Exception {
+        if (!supportsConnections()) {
+            return;
+        }
+        catalog.createDatabase(db1, createDb(), false);
+        assertThatThrownBy(() -> catalog.getConnection(connectionPath1))
+                .isInstanceOf(ConnectionNotExistException.class)
+                .hasMessage("Connection 'db1.c1' does not exist in catalog 
'test-catalog'.");
+    }
+
+    @Test
+    public void testDropConnection() throws Exception {
+        if (!supportsConnections()) {
+            return;
+        }
+        catalog.createDatabase(db1, createDb(), false);
+        catalog.createConnection(connectionPath1, createConnection(), false);
+        assertThat(catalog.getConnection(connectionPath1)).isNotNull();
+        catalog.dropConnection(connectionPath1, false);
+        assertThatThrownBy(() -> catalog.getConnection(connectionPath1))
+                .isInstanceOf(ConnectionNotExistException.class)
+                .hasMessage("Connection 'db1.c1' does not exist in catalog 
'test-catalog'.");
+    }
+
+    @Test
+    public void testAlterConnection() throws Exception {
+        if (!supportsConnections()) {
+            return;
+        }
+        catalog.createDatabase(db1, createDb(), false);
+        catalog.createConnection(connectionPath1, createConnection(), false);
+        assertThat(catalog.getConnection(connectionPath1)).isNotNull();
+        CatalogConnection newConnection =
+                CatalogConnection.of(
+                        new HashMap<String, String>() {
+                            {
+                                put("type", "kafka");
+                                put("bootstrap.servers", "remote:9092");
+                                put("group.id", "my-group");
+                            }
+                        },
+                        "updated connection");
+        catalog.alterConnection(connectionPath1, newConnection, false);
+        assertThat(catalog.getConnection(connectionPath1).getComment())
+                .isEqualTo("updated connection");
+        Map<String, String> expectedOptions = new HashMap<>();
+        expectedOptions.put("type", "kafka");
+        expectedOptions.put("bootstrap.servers", "remote:9092");
+        expectedOptions.put("group.id", "my-group");
+        
assertThat(catalog.getConnection(connectionPath1).getOptions()).isEqualTo(expectedOptions);
+    }
+
+    @Test
+    public void testAlterConnection_ConnectionNotExistException() throws 
Exception {
+        if (!supportsConnections()) {
+            return;
+        }
+        catalog.createDatabase(db1, createDb(), false);
+        CatalogConnection newConnection =
+                CatalogConnection.of(
+                        new HashMap<String, String>() {
+                            {
+                                put("type", "kafka");
+                                put("bootstrap.servers", "remote:9092");
+                            }
+                        },
+                        "new connection");
+        assertThatThrownBy(() -> catalog.alterConnection(connectionPath1, 
newConnection, false))
+                .isInstanceOf(ConnectionNotExistException.class)
+                .hasMessage("Connection 'db1.c1' does not exist in catalog 
'test-catalog'.");
+    }
+
+    @Test
+    public void testAlterMissingConnectionIgnoreIfNotExist() throws Exception {
+        if (!supportsConnections()) {
+            return;
+        }
+        catalog.createDatabase(db1, createDb(), false);
+        CatalogConnection newConnection =
+                CatalogConnection.of(
+                        new HashMap<String, String>() {
+                            {
+                                put("type", "kafka");
+                                put("bootstrap.servers", "remote:9092");
+                            }
+                        },
+                        "new connection");
+        catalog.alterConnection(connectionPath1, newConnection, true);
+    }
+
+    @Test
+    public void testDropMissingConnectionNotExistException() throws Exception {
+        if (!supportsConnections()) {
+            return;
+        }
+        catalog.createDatabase(db1, createDb(), false);
+        assertThatThrownBy(() -> catalog.dropConnection(connectionPath1, 
false))
+                .isInstanceOf(ConnectionNotExistException.class)
+                .hasMessage("Connection 'db1.c1' does not exist in catalog 
'test-catalog'.");
+    }
+
+    @Test
+    public void testDropMissingConnectionIgnoreIfNotExist() throws Exception {
+        if (!supportsConnections()) {
+            return;
+        }
+        catalog.createDatabase(db1, createDb(), false);
+        catalog.dropConnection(connectionPath1, true);
+    }
+
     // ------ tables ------
 
     @Test
@@ -1614,6 +1823,19 @@ public abstract class CatalogTest {
 
     protected abstract boolean supportsModels();
 
+    protected abstract boolean supportsConnections();
+
+    protected CatalogConnection createConnection() {
+        return CatalogConnection.of(
+                new HashMap<String, String>() {
+                    {
+                        put("type", "kafka");
+                        put("bootstrap.servers", "localhost:9092");
+                    }
+                },
+                null);
+    }
+
     protected ResolvedSchema createSchema() {
         return new ResolvedSchema(
                 Arrays.asList(
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/DefaultConnectionFactoryTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/DefaultConnectionFactoryTest.java
new file mode 100644
index 00000000000..50ebac09c69
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/DefaultConnectionFactoryTest.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogConnection;
+import org.apache.flink.table.catalog.SensitiveConnection;
+import org.apache.flink.table.secret.ReadableSecretStore;
+import org.apache.flink.table.secret.WritableSecretStore;
+import org.apache.flink.table.secret.exceptions.SecretException;
+import org.apache.flink.table.secret.exceptions.SecretNotFoundException;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link DefaultConnectionFactory}. */
+class DefaultConnectionFactoryTest {
+
+    private final DefaultConnectionFactory factory = new 
DefaultConnectionFactory();
+
+    // 
---------------------------------------------------------------------------------------------
+    // createConnection
+    // 
---------------------------------------------------------------------------------------------
+
+    @Test
+    void createConnectionWithoutSensitiveOptionsDoesNotCallSecretStore() {
+        Map<String, String> options = new LinkedHashMap<>();
+        options.put("host", "localhost");
+        options.put("port", "9092");
+
+        RecordingSecretStore secretStore = new RecordingSecretStore();
+        CatalogConnection result =
+                factory.createConnection(
+                        SensitiveConnection.of(options, "my-comment"), 
secretStore);
+
+        assertThat(secretStore.stored).isEmpty();
+        assertThat(result.getOptions())
+                .containsExactlyInAnyOrderEntriesOf(options)
+                
.doesNotContainKey(DefaultConnectionFactory.SECRET_REFERENCE_KEY);
+        assertThat(result.getComment()).isEqualTo("my-comment");
+    }
+
+    @Test
+    void createConnectionWithSensitiveOptionsStoresOnlySensitiveOptions() {
+        Map<String, String> options = new LinkedHashMap<>();
+        options.put("host", "localhost");
+        options.put("password", "p@ss");
+        options.put("api-key", "abc");
+        options.put("token", "tok");
+
+        RecordingSecretStore secretStore = new RecordingSecretStore();
+        CatalogConnection result =
+                factory.createConnection(SensitiveConnection.of(options, 
null), secretStore);
+
+        // secretStore.storeSecret called exactly once with only the sensitive 
subset
+        assertThat(secretStore.stored).hasSize(1);
+        Map<String, String> storedSecret = 
secretStore.stored.values().iterator().next();
+        assertThat(storedSecret)
+                .containsOnlyKeys("password", "api-key", "token")
+                .containsEntry("password", "p@ss")
+                .containsEntry("api-key", "abc")
+                .containsEntry("token", "tok");
+
+        // Returned connection retains only non-sensitive options plus a 
secret reference
+        String secretId = secretStore.stored.keySet().iterator().next();
+        assertThat(result.getOptions())
+                .containsOnlyKeys("host", 
DefaultConnectionFactory.SECRET_REFERENCE_KEY)
+                .containsEntry("host", "localhost")
+                .containsEntry(DefaultConnectionFactory.SECRET_REFERENCE_KEY, 
secretId);
+    }
+
+    @Test
+    void createConnectionRecognizesAllSensitiveFieldNames() {
+        // Covers the default sensitive whitelist; failure here flags an 
accidental change to
+        // the whitelist.
+        List<String> sensitiveKeys =
+                Arrays.asList(
+                        "password",
+                        "secret",
+                        "fs.azure.account.key",
+                        "apikey",
+                        "api-key",
+                        "auth-params",
+                        "service-key",
+                        "token",
+                        "basic-auth",
+                        "jaas.config",
+                        "http-headers");
+
+        Map<String, String> options = new LinkedHashMap<>();
+        for (String key : sensitiveKeys) {
+            options.put(key, "value-of-" + key);
+        }
+
+        RecordingSecretStore secretStore = new RecordingSecretStore();
+        CatalogConnection result =
+                factory.createConnection(SensitiveConnection.of(options, 
null), secretStore);
+
+        assertThat(secretStore.stored).hasSize(1);
+        Map<String, String> storedSecret = 
secretStore.stored.values().iterator().next();
+        assertThat(storedSecret).containsExactlyInAnyOrderEntriesOf(options);
+        assertThat(result.getOptions())
+                
.containsOnlyKeys(DefaultConnectionFactory.SECRET_REFERENCE_KEY);
+    }
+
+    @Test
+    void createConnectionRejectsUserSuppliedReservedKey() {
+        Map<String, String> options =
+                Collections.singletonMap(
+                        DefaultConnectionFactory.SECRET_REFERENCE_KEY, 
"user-injected");
+
+        RecordingSecretStore secretStore = new RecordingSecretStore();
+        assertThatThrownBy(
+                        () ->
+                                factory.createConnection(
+                                        SensitiveConnection.of(options, null), 
secretStore))
+                .isInstanceOf(ValidationException.class)
+                
.hasMessageContaining(DefaultConnectionFactory.SECRET_REFERENCE_KEY)
+                .hasMessageContaining("reserved");
+        assertThat(secretStore.stored).isEmpty();
+    }
+
+    @Test
+    void createConnectionPropagatesSecretException() {
+        Map<String, String> options = Collections.singletonMap("password", 
"p@ss");
+        SecretException original = new SecretException("downstream failure");
+
+        RecordingSecretStore secretStore =
+                new RecordingSecretStore() {
+                    @Override
+                    public String storeSecret(Map<String, String> secretData) {
+                        throw original;
+                    }
+                };
+
+        assertThatThrownBy(
+                        () ->
+                                factory.createConnection(
+                                        SensitiveConnection.of(options, null), 
secretStore))
+                .isSameAs(original);
+    }
+
+    @Test
+    void createConnectionWrapsRuntimeExceptionFromStore() {
+        Map<String, String> options = Collections.singletonMap("password", 
"p@ss");
+
+        RecordingSecretStore secretStore =
+                new RecordingSecretStore() {
+                    @Override
+                    public String storeSecret(Map<String, String> secretData) {
+                        throw new IllegalStateException("kaboom");
+                    }
+                };
+
+        assertThatThrownBy(
+                        () ->
+                                factory.createConnection(
+                                        SensitiveConnection.of(options, null), 
secretStore))
+                .isInstanceOf(SecretException.class)
+                .hasMessageContaining("Failed to store connection secret")
+                .hasCauseInstanceOf(IllegalStateException.class);
+    }
+
+    // 
---------------------------------------------------------------------------------------------
+    // resolveConnection
+    // 
---------------------------------------------------------------------------------------------
+
+    @Test
+    void resolveConnectionWithoutSecretReferenceReturnsOptionsUnchanged() {
+        Map<String, String> options = new LinkedHashMap<>();
+        options.put("host", "localhost");
+        options.put("port", "9092");
+
+        RecordingSecretStore secretStore = new RecordingSecretStore();
+        SensitiveConnection resolved =
+                factory.resolveConnection(
+                        CatalogConnection.of(options, "the-comment"), 
secretStore);
+
+        assertThat(secretStore.retrieved).isEmpty();
+        
assertThat(resolved.getOptions()).containsExactlyInAnyOrderEntriesOf(options);
+        assertThat(resolved.getComment()).isEqualTo("the-comment");
+    }
+
+    @Test
+    void resolveConnectionMergesSecretsBackIntoOptions() {
+        RecordingSecretStore secretStore = new RecordingSecretStore();
+        Map<String, String> secrets = new HashMap<>();
+        secrets.put("password", "p@ss");
+        secrets.put("token", "tok");
+        String secretId = secretStore.storeSecret(secrets);
+
+        Map<String, String> catalogOptions = new LinkedHashMap<>();
+        catalogOptions.put("host", "localhost");
+        catalogOptions.put(DefaultConnectionFactory.SECRET_REFERENCE_KEY, 
secretId);
+
+        SensitiveConnection resolved =
+                factory.resolveConnection(CatalogConnection.of(catalogOptions, 
null), secretStore);
+
+        assertThat(secretStore.retrieved).containsExactly(secretId);
+        assertThat(resolved.getOptions())
+                .containsOnlyKeys("host", "password", "token")
+                .containsEntry("host", "localhost")
+                .containsEntry("password", "p@ss")
+                .containsEntry("token", "tok")
+                
.doesNotContainKey(DefaultConnectionFactory.SECRET_REFERENCE_KEY);
+    }
+
+    @Test
+    void resolveConnectionTranslatesSecretNotFoundToValidationException() {
+        // Empty stored map → fake's getSecret will throw 
SecretNotFoundException by default.
+        RecordingSecretStore secretStore = new RecordingSecretStore();
+
+        Map<String, String> catalogOptions =
+                
Collections.singletonMap(DefaultConnectionFactory.SECRET_REFERENCE_KEY, 
"missing");
+
+        assertThatThrownBy(
+                        () ->
+                                factory.resolveConnection(
+                                        CatalogConnection.of(catalogOptions, 
null), secretStore))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining("'missing'")
+                .hasCauseInstanceOf(SecretNotFoundException.class);
+    }
+
+    @Test
+    void resolveConnectionWrapsRuntimeExceptionFromStore() {
+        RecordingSecretStore secretStore =
+                new RecordingSecretStore() {
+                    @Override
+                    public Map<String, String> getSecret(String secretId) {
+                        throw new IllegalStateException("transport down");
+                    }
+                };
+
+        Map<String, String> catalogOptions =
+                
Collections.singletonMap(DefaultConnectionFactory.SECRET_REFERENCE_KEY, "abc");
+
+        assertThatThrownBy(
+                        () ->
+                                factory.resolveConnection(
+                                        CatalogConnection.of(catalogOptions, 
null), secretStore))
+                .isInstanceOf(SecretException.class)
+                .hasMessageContaining("'abc'")
+                .hasCauseInstanceOf(IllegalStateException.class);
+    }
+
+    // 
---------------------------------------------------------------------------------------------
+    // deleteSecrets
+    // 
---------------------------------------------------------------------------------------------
+
+    @Test
+    void deleteSecretsWithoutReferenceIsNoOp() {
+        Map<String, String> catalogOptions = Collections.singletonMap("host", 
"localhost");
+
+        RecordingSecretStore secretStore = new RecordingSecretStore();
+        factory.deleteSecrets(CatalogConnection.of(catalogOptions, null), 
secretStore);
+
+        assertThat(secretStore.removed).isEmpty();
+    }
+
+    @Test
+    void deleteSecretsRemovesByReference() {
+        RecordingSecretStore secretStore = new RecordingSecretStore();
+        String secretId = 
secretStore.storeSecret(Collections.singletonMap("password", "p@ss"));
+
+        Map<String, String> catalogOptions = new LinkedHashMap<>();
+        catalogOptions.put("host", "localhost");
+        catalogOptions.put(DefaultConnectionFactory.SECRET_REFERENCE_KEY, 
secretId);
+
+        factory.deleteSecrets(CatalogConnection.of(catalogOptions, null), 
secretStore);
+
+        assertThat(secretStore.removed).containsExactly(secretId);
+        assertThat(secretStore.stored).doesNotContainKey(secretId);
+    }
+
+    @Test
+    void deleteSecretsWrapsRuntimeExceptionFromStore() {
+        RecordingSecretStore secretStore =
+                new RecordingSecretStore() {
+                    @Override
+                    public void removeSecret(String secretId) {
+                        throw new IllegalStateException("transport down");
+                    }
+                };
+
+        Map<String, String> catalogOptions =
+                
Collections.singletonMap(DefaultConnectionFactory.SECRET_REFERENCE_KEY, "abc");
+
+        assertThatThrownBy(
+                        () ->
+                                factory.deleteSecrets(
+                                        CatalogConnection.of(catalogOptions, 
null), secretStore))
+                .isInstanceOf(SecretException.class)
+                .hasMessageContaining("'abc'")
+                .hasCauseInstanceOf(IllegalStateException.class);
+    }
+
+    // 
---------------------------------------------------------------------------------------------
+    // round-trip
+    // 
---------------------------------------------------------------------------------------------
+
+    @Test
+    void createAndResolveRoundTripPreservesOptionsAndComment() {
+        Map<String, String> options = new LinkedHashMap<>();
+        options.put("host", "localhost");
+        options.put("port", "9092");
+        options.put("password", "p@ss");
+        options.put("token", "tok");
+
+        RecordingSecretStore secretStore = new RecordingSecretStore();
+        CatalogConnection persisted =
+                factory.createConnection(
+                        SensitiveConnection.of(options, "round-trip"), 
secretStore);
+        SensitiveConnection resolved = factory.resolveConnection(persisted, 
secretStore);
+
+        
assertThat(resolved.getOptions()).containsExactlyInAnyOrderEntriesOf(options);
+        assertThat(resolved.getComment()).isEqualTo("round-trip");
+    }
+
+    // 
---------------------------------------------------------------------------------------------
+    // Helpers
+    // 
---------------------------------------------------------------------------------------------
+
+    /**
+     * Fake secret store that records all interactions; tests use the maps 
directly to verify how
+     * {@link DefaultConnectionFactory} invokes the store.
+     */
+    private static class RecordingSecretStore implements WritableSecretStore, 
ReadableSecretStore {
+
+        final Map<String, Map<String, String>> stored = new LinkedHashMap<>();
+        final List<String> retrieved = new java.util.ArrayList<>();
+        final List<String> removed = new java.util.ArrayList<>();
+
+        @Override
+        public String storeSecret(Map<String, String> secretData) {
+            String id = UUID.randomUUID().toString();
+            stored.put(id, new LinkedHashMap<>(secretData));
+            return id;
+        }
+
+        @Override
+        public void removeSecret(String secretId) {
+            removed.add(secretId);
+            stored.remove(secretId);
+        }
+
+        @Override
+        public void updateSecret(String secretId, Map<String, String> 
newSecretData)
+                throws SecretNotFoundException {
+            if (!stored.containsKey(secretId)) {
+                throw new SecretNotFoundException("Secret '" + secretId + "' 
not found.");
+            }
+            stored.put(secretId, new LinkedHashMap<>(newSecretData));
+        }
+
+        @Override
+        public Map<String, String> getSecret(String secretId) throws 
SecretNotFoundException {
+            retrieved.add(secretId);
+            Map<String, String> result = stored.get(secretId);
+            if (result == null) {
+                throw new SecretNotFoundException("Secret '" + secretId + "' 
not found.");
+            }
+            return result;
+        }
+    }
+}

Reply via email to