lihaosky commented on code in PR #28085:
URL: https://github.com/apache/flink/pull/28085#discussion_r3236339101


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##########
@@ -1791,6 +1836,328 @@ public ResolvedCatalogModel 
resolveCatalogModel(CatalogModel model) {
         return ResolvedCatalogModel.of(model, resolvedInputSchema, 
resolvedOutputSchema);
     }
 
+    // ------ connections ------
+
+    /**
+     * Get a connection from the catalog with the given object identifier.
+     *
+     * @param objectIdentifier The fully qualified path of the connection.
+     * @return The requested connection wrapped in Optional.
+     */
+    public Optional<CatalogConnection> getConnection(ObjectIdentifier 
objectIdentifier) {
+        CatalogConnection temporaryConnection = 
temporaryConnections.get(objectIdentifier);
+        if (temporaryConnection != null) {
+            return Optional.of(temporaryConnection);
+        }
+
+        Optional<Catalog> catalog = 
getCatalog(objectIdentifier.getCatalogName());
+        if (catalog.isPresent()) {
+            try {
+                return 
Optional.of(catalog.get().getConnection(objectIdentifier.toObjectPath()));
+            } catch (ConnectionNotExistException | 
UnsupportedOperationException e) {
+                // ConnectionNotExistException: connection does not exist in 
this catalog.
+                // UnsupportedOperationException: catalog does not support 
connections.
+                return Optional.empty();
+            }
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    /**
+     * List all connections in the given catalog and database.
+     *
+     * @param catalogName The name of the catalog.
+     * @param databaseName The name of the database.
+     * @return A set of connection names.
+     */
+    public Set<String> listConnections(String catalogName, String 
databaseName) {
+        Catalog catalog = getCatalogOrError(catalogName);
+        try {
+            Set<String> connections = new 
HashSet<>(catalog.listConnections(databaseName));
+
+            // Add temporary connections for this catalog and database
+            temporaryConnections.keySet().stream()
+                    .filter(
+                            identifier ->
+                                    
identifier.getCatalogName().equals(catalogName)
+                                            && 
identifier.getDatabaseName().equals(databaseName))
+                    .map(ObjectIdentifier::getObjectName)
+                    .forEach(connections::add);
+
+            return connections;
+        } catch (DatabaseNotExistException e) {
+            throw new ValidationException(
+                    String.format(
+                            "Database %s does not exist in catalog %s.", 
databaseName, catalogName),
+                    e);
+        } catch (CatalogException e) {
+            throw new TableException(
+                    String.format(
+                            "Failed to list connections in catalog %s and 
database %s.",
+                            catalogName, databaseName),
+                    e);
+        }
+    }
+
+    /**
+     * Create a permanent connection in the given fully qualified path.
+     *
+     * <p>If a {@link ConnectionFactory} and {@link WritableSecretStore} are 
configured, sensitive
+     * fields are extracted from the connection and stored in the secret store 
before persisting the
+     * non-sensitive {@link CatalogConnection} to the catalog.
+     *
+     * @param connection The connection with all options including sensitive 
fields.
+     * @param objectIdentifier The fully qualified path where to create the 
connection.
+     * @param ignoreIfExists If false exception will be thrown if the 
connection already exists.
+     */
+    public void createConnection(
+            SensitiveConnection connection,
+            ObjectIdentifier objectIdentifier,
+            boolean ignoreIfExists) {
+        if (connectionFactory == null || writableSecretStore == null) {
+            throw new ValidationException(
+                    "ConnectionFactory and WritableSecretStore must be 
configured to create connections.");
+        }
+        if (getConnection(objectIdentifier).isPresent()) {
+            if (ignoreIfExists) {
+                return;
+            }
+            throw new ValidationException(
+                    String.format(
+                            "Connection with identifier '%s' already exists.",
+                            objectIdentifier.asSummaryString()));
+        }
+        final CatalogConnection catalogConnection =
+                connectionFactory.createConnection(connection, 
writableSecretStore);
+        boolean persisted = false;
+        try {
+            execute(
+                    (catalog, path) -> {
+                        catalog.createConnection(path, catalogConnection, 
ignoreIfExists);
+                        catalogModificationListeners.forEach(
+                                listener ->
+                                        listener.onEvent(
+                                                
CreateConnectionEvent.createEvent(
+                                                        
CatalogContext.createContext(
+                                                                
objectIdentifier.getCatalogName(),
+                                                                catalog),
+                                                        objectIdentifier,
+                                                        catalogConnection,
+                                                        ignoreIfExists,
+                                                        false)));
+                    },
+                    objectIdentifier,
+                    ignoreIfExists,
+                    "CreateConnection");
+            persisted = true;
+        } finally {
+            if (!persisted) {
+                tryDeleteSecrets(
+                        catalogConnection,
+                        writableSecretStore,
+                        "rollback createConnection " + objectIdentifier);
+            }
+        }
+    }
+
+    /**
+     * Create a temporary connection in the given fully qualified path.
+     *
+     * @param connection The connection with all options including sensitive 
fields.
+     * @param objectIdentifier The fully qualified path where to create the 
connection.
+     * @param ignoreIfExists If false exception will be thrown if the 
connection already exists.
+     */
+    public void createTemporaryConnection(
+            SensitiveConnection connection,
+            ObjectIdentifier objectIdentifier,
+            boolean ignoreIfExists) {
+        if (connectionFactory == null) {

Review Comment:
   You are right. ConnectionFactory should be discovered by SPI. Can I fix it 
in a separate PR? This PR is already large. Created 
https://issues.apache.org/jira/browse/FLINK-39678 to track



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DefaultConnectionFactory.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogConnection;
+import org.apache.flink.table.catalog.SensitiveConnection;
+import org.apache.flink.table.secret.ReadableSecretStore;
+import org.apache.flink.table.secret.WritableSecretStore;
+import org.apache.flink.table.secret.exceptions.SecretException;
+import org.apache.flink.table.secret.exceptions.SecretNotFoundException;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Default implementation of {@link ConnectionFactory} that identifies 
sensitive fields by a
+ * predefined whitelist of field names.
+ *
+ * <p>During {@link #createConnection}, sensitive fields are extracted from 
the connection options
+ * and stored as a single secret in the {@link WritableSecretStore}. A 
reference key ({@value
+ * #SECRET_REFERENCE_KEY}) pointing to the stored secret is added to the 
returned {@link
+ * CatalogConnection}.
+ *
+ * <p>During {@link #resolveConnection}, the secret reference is used to 
retrieve the sensitive
+ * fields from the {@link ReadableSecretStore} and merge them back into the 
options.
+ *
+ * <p>The following field names are treated as sensitive by default: {@code 
password}, {@code
+ * secret}, {@code fs.azure.account.key}, {@code apikey}, {@code api-key}, 
{@code auth-params},
+ * {@code service-key}, {@code token}, {@code basic-auth}, {@code 
jaas.config}, {@code
+ * http-headers}.
+ */
+@Internal
+public class DefaultConnectionFactory implements ConnectionFactory {
+
+    /**
+     * Reserved option key used to store the reference to secrets in the 
secret store. The
+     * surrounding double underscores make collision with user-supplied option 
names unlikely; user
+     * options containing this key will be rejected at create-time.
+     */
+    public static final String SECRET_REFERENCE_KEY = 
"__flink.encrypted_secret_key__";
+
+    private static final Set<String> SENSITIVE_FIELD_NAMES =
+            Collections.unmodifiableSet(
+                    new HashSet<>(
+                            Arrays.asList(
+                                    "password",
+                                    "secret",
+                                    "fs.azure.account.key",
+                                    "apikey",
+                                    "api-key",
+                                    "auth-params",
+                                    "service-key",
+                                    "token",
+                                    "basic-auth",
+                                    "jaas.config",
+                                    "http-headers")));
+
+    @Override
+    public String factoryIdentifier() {
+        return "default";
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public CatalogConnection createConnection(
+            SensitiveConnection connection, WritableSecretStore secretStore) {
+        Map<String, String> allOptions = connection.getOptions();
+
+        if (allOptions.containsKey(SECRET_REFERENCE_KEY)) {
+            throw new ValidationException(
+                    String.format(
+                            "Connection option '%s' is reserved and cannot be 
set by users.",
+                            SECRET_REFERENCE_KEY));
+        }
+
+        Map<String, String> sensitiveOptions =
+                allOptions.entrySet().stream()
+                        .filter(e -> 
SENSITIVE_FIELD_NAMES.contains(e.getKey()))
+                        .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+        Map<String, String> nonSensitiveOptions =
+                allOptions.entrySet().stream()
+                        .filter(e -> 
!SENSITIVE_FIELD_NAMES.contains(e.getKey()))
+                        .collect(
+                                Collectors.toMap(
+                                        Map.Entry::getKey,
+                                        Map.Entry::getValue,
+                                        (a, b) -> a,
+                                        HashMap::new));
+
+        if (!sensitiveOptions.isEmpty()) {
+            final String secretId;
+            try {
+                secretId = secretStore.storeSecret(sensitiveOptions);
+            } catch (SecretException e) {
+                throw e;
+            } catch (RuntimeException e) {
+                throw new SecretException("Failed to store connection 
secret.", e);
+            }
+            nonSensitiveOptions.put(SECRET_REFERENCE_KEY, secretId);

Review Comment:
   Let me add tests. `SECRET_REFERENCE_KEY` is not used out side of this class. 
Can convert it to private



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DefaultConnectionFactory.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogConnection;
+import org.apache.flink.table.catalog.SensitiveConnection;
+import org.apache.flink.table.secret.ReadableSecretStore;
+import org.apache.flink.table.secret.WritableSecretStore;
+import org.apache.flink.table.secret.exceptions.SecretException;
+import org.apache.flink.table.secret.exceptions.SecretNotFoundException;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Default implementation of {@link ConnectionFactory} that identifies 
sensitive fields by a
+ * predefined whitelist of field names.
+ *
+ * <p>During {@link #createConnection}, sensitive fields are extracted from 
the connection options
+ * and stored as a single secret in the {@link WritableSecretStore}. A 
reference key ({@value
+ * #SECRET_REFERENCE_KEY}) pointing to the stored secret is added to the 
returned {@link
+ * CatalogConnection}.
+ *
+ * <p>During {@link #resolveConnection}, the secret reference is used to 
retrieve the sensitive
+ * fields from the {@link ReadableSecretStore} and merge them back into the 
options.
+ *
+ * <p>The following field names are treated as sensitive by default: {@code 
password}, {@code
+ * secret}, {@code fs.azure.account.key}, {@code apikey}, {@code api-key}, 
{@code auth-params},
+ * {@code service-key}, {@code token}, {@code basic-auth}, {@code 
jaas.config}, {@code
+ * http-headers}.
+ */
+@Internal
+public class DefaultConnectionFactory implements ConnectionFactory {
+
+    /**
+     * Reserved option key used to store the reference to secrets in the 
secret store. The
+     * surrounding double underscores make collision with user-supplied option 
names unlikely; user
+     * options containing this key will be rejected at create-time.
+     */
+    public static final String SECRET_REFERENCE_KEY = 
"__flink.encrypted_secret_key__";
+
+    private static final Set<String> SENSITIVE_FIELD_NAMES =
+            Collections.unmodifiableSet(
+                    new HashSet<>(
+                            Arrays.asList(
+                                    "password",

Review Comment:
   It's from the list here: 
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java#L49.
 I can add a comment. wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to