This is an automated email from the ASF dual-hosted git repository.

gaborgsomogyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 883ed7d37bc [FLINK-39561][security] Extend sensitive key redaction 
with missing access key patterns and user-configurable additional keys
883ed7d37bc is described below

commit 883ed7d37bcf5c7e38ceb01b3652a161bc26feae
Author: Gabor Somogyi <[email protected]>
AuthorDate: Mon May 4 11:45:22 2026 +0200

    [FLINK-39561][security] Extend sensitive key redaction with missing access 
key patterns and user-configurable additional keys
---
 .../generated/checkpointing_configuration.html     |  12 +--
 .../generated/common_checkpointing_section.html    |  12 +--
 .../generated/security_configuration.html          |   6 ++
 .../apache/flink/configuration/Configuration.java  |   7 +-
 .../flink/configuration/ConfigurationUtils.java    |  10 +-
 .../flink/configuration/GlobalConfiguration.java   |  27 ++++--
 .../flink/configuration/SecurityOptions.java       |  23 +++++
 .../flink/configuration/ConfigurationTest.java     |   7 +-
 .../configuration/ConfigurationUtilsTest.java      |   3 +-
 .../configuration/GlobalConfigurationTest.java     | 106 +++++++++++++++++----
 .../apache/flink/client/python/PythonEnvUtils.java |  16 +++-
 .../flink/client/python/PythonEnvUtilsTest.java    |   5 +-
 .../rpc/pekko/ActorSystemBootstrapTools.java       |   4 +-
 .../runtime/rest/handler/job/JobConfigHandler.java |  13 ++-
 .../runtime/rest/messages/ConfigurationInfo.java   |   4 +-
 .../flink/runtime/rest/messages/JobConfigInfo.java |   8 +-
 .../flink/runtime/util/EnvironmentInformation.java |   3 +-
 .../runtime/webmonitor/WebMonitorEndpoint.java     |   4 +-
 .../rest/handler/job/JobConfigHandlerTest.java     |   5 +-
 .../apache/flink/table/factories/FactoryUtil.java  |  49 ++++++++--
 .../factories/WorkflowSchedulerFactoryUtil.java    |   6 +-
 .../apache/flink/yarn/cli/FlinkYarnSessionCli.java |   3 +-
 22 files changed, 261 insertions(+), 72 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/checkpointing_configuration.html 
b/docs/layouts/shortcodes/generated/checkpointing_configuration.html
index b5e40254421..704ac492aa8 100644
--- a/docs/layouts/shortcodes/generated/checkpointing_configuration.html
+++ b/docs/layouts/shortcodes/generated/checkpointing_configuration.html
@@ -44,6 +44,12 @@
             <td>String</td>
             <td>The default directory used for storing the data files and meta 
data of checkpoints in a Flink supported filesystem. The storage path must be 
accessible from all participating processes/nodes(i.e. all TaskManagers and 
JobManagers). If the 'execution.checkpointing.storage' is set to 'jobmanager', 
only the meta data of checkpoints will be stored in this directory.</td>
         </tr>
+        <tr>
+            <td><h5>execution.checkpointing.during-recovery.enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to enable checkpointing during recovery from an 
unaligned checkpoint. When enabled, the job can take checkpoints while still 
recovering channel state (inflight data) from a previous unaligned checkpoint. 
This avoids the need to wait for full recovery before the first checkpoint can 
be triggered, which reduces the window of vulnerability to failures during 
recovery.<br /><br />This option requires <code 
class="highlighter-rouge">execution.checkpointing.unaligned.re [...]
+        </tr>
         <tr>
             
<td><h5>execution.checkpointing.externalized-checkpoint-retention</h5></td>
             <td style="word-wrap: break-word;">NO_EXTERNALIZED_CHECKPOINTS</td>
@@ -158,12 +164,6 @@
             <td>Integer</td>
             <td>The tolerable checkpoint consecutive failure number. If set to 
0, that means we do not tolerance any checkpoint failure. This only applies to 
the following failure reasons: IOException on the Job Manager, failures in the 
async phase on the Task Managers and checkpoint expiration due to a timeout. 
Failures originating from the sync phase on the Task Managers are always 
forcing failover of an affected task. Other types of checkpoint failures (such 
as checkpoint being subsum [...]
         </tr>
-        <tr>
-            <td><h5>execution.checkpointing.during-recovery.enabled</h5></td>
-            <td style="word-wrap: break-word;">false</td>
-            <td>Boolean</td>
-            <td>Whether to enable checkpointing during recovery from an 
unaligned checkpoint. When enabled, the job can take checkpoints while still 
recovering channel state (inflight data) from a previous unaligned checkpoint. 
This avoids the need to wait for full recovery before the first checkpoint can 
be triggered, which reduces the window of vulnerability to failures during 
recovery.<br /><br />This option requires <code 
class="highlighter-rouge">execution.checkpointing.unaligned.re [...]
-        </tr>
         <tr>
             <td><h5>execution.checkpointing.unaligned.enabled</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git 
a/docs/layouts/shortcodes/generated/common_checkpointing_section.html 
b/docs/layouts/shortcodes/generated/common_checkpointing_section.html
index 938a86ddde5..1a5159022eb 100644
--- a/docs/layouts/shortcodes/generated/common_checkpointing_section.html
+++ b/docs/layouts/shortcodes/generated/common_checkpointing_section.html
@@ -32,6 +32,12 @@
             <td>Boolean</td>
             <td>Option whether to discard a checkpoint's states in parallel 
using the ExecutorService passed into the cleaner</td>
         </tr>
+        <tr>
+            <td><h5>execution.checkpointing.during-recovery.enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to enable checkpointing during recovery from an 
unaligned checkpoint. When enabled, the job can take checkpoints while still 
recovering channel state (inflight data) from a previous unaligned checkpoint. 
This avoids the need to wait for full recovery before the first checkpoint can 
be triggered, which reduces the window of vulnerability to failures during 
recovery.<br /><br />This option requires <code 
class="highlighter-rouge">execution.checkpointing.unaligned.re [...]
+        </tr>
         <tr>
             <td><h5>execution.checkpointing.incremental</h5></td>
             <td style="word-wrap: break-word;">false</td>
@@ -56,12 +62,6 @@
             <td>Integer</td>
             <td>The maximum number of completed checkpoints to retain.</td>
         </tr>
-        <tr>
-            <td><h5>execution.checkpointing.during-recovery.enabled</h5></td>
-            <td style="word-wrap: break-word;">false</td>
-            <td>Boolean</td>
-            <td>Whether to enable checkpointing during recovery from an 
unaligned checkpoint. When enabled, the job can take checkpoints while still 
recovering channel state (inflight data) from a previous unaligned checkpoint. 
This avoids the need to wait for full recovery before the first checkpoint can 
be triggered, which reduces the window of vulnerability to failures during 
recovery.<br /><br />This option requires <code 
class="highlighter-rouge">execution.checkpointing.unaligned.re [...]
-        </tr>
         <tr>
             
<td><h5>execution.checkpointing.unaligned.recover-output-on-downstream.enabled</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git a/docs/layouts/shortcodes/generated/security_configuration.html 
b/docs/layouts/shortcodes/generated/security_configuration.html
index 9bd0e39823b..fdf554c6670 100644
--- a/docs/layouts/shortcodes/generated/security_configuration.html
+++ b/docs/layouts/shortcodes/generated/security_configuration.html
@@ -86,6 +86,12 @@
             <td>List&lt;String&gt;</td>
             <td>List of factories that should be used to instantiate security 
modules. All listed modules will be installed. Keep in mind that the configured 
security context might rely on some modules being present.</td>
         </tr>
+        <tr>
+            <td><h5>security.redaction.additional-keys</h5></td>
+            <td style="word-wrap: break-word;"></td>
+            <td>List&lt;String&gt;</td>
+            <td>Comma-separated list of additional configuration key 
substrings whose values should be redacted in logs and REST API responses. 
Matching is case-insensitive and based on substring containment. The built-in 
sensitive key patterns are immutable and cannot be overridden via this 
option.</td>
+        </tr>
         <tr>
             <td><h5>security.ssl.algorithms</h5></td>
             <td style="word-wrap: 
break-word;">"TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"</td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java 
b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index d7294752788..6676d1a7dab 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -376,7 +376,9 @@ public class Configuration extends 
ExecutionConfig.GlobalJobParameters
             }
         } catch (Exception e) {
             throw new IllegalArgumentException(
-                    GlobalConfiguration.isSensitive(option.key())
+                    GlobalConfiguration.isSensitive(
+                                    option.key(),
+                                    
this.get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS))
                             ? String.format("Could not parse value for key 
'%s'.", option.key())
                             : String.format(
                                     "Could not parse value '%s' for key '%s'.",
@@ -680,7 +682,8 @@ public class Configuration extends 
ExecutionConfig.GlobalJobParameters
                                 .collect(
                                         Collectors.toMap(
                                                 Map.Entry::getKey,
-                                                entry -> 
entry.getValue().toString())))
+                                                entry -> 
entry.getValue().toString())),
+                        this.get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS))
                 .toString();
     }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
index 338f9b660dc..af3b44da5ba 100755
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
@@ -170,19 +170,23 @@ public class ConfigurationUtils {
 
     /**
      * Replaces values whose keys are sensitive according to {@link
-     * GlobalConfiguration#isSensitive(String)} with {@link 
GlobalConfiguration#HIDDEN_CONTENT}.
+     * GlobalConfiguration#isSensitive(String, List)} with {@link
+     * GlobalConfiguration#HIDDEN_CONTENT}.
      *
      * <p>This can be useful when displaying configuration values.
      *
      * @param keyValuePairs for which to hide sensitive values
+     * @param additionalSensitiveKeys user-defined additional sensitive key 
substrings; use {@link
+     *     SecurityOptions#ADDITIONAL_SENSITIVE_KEYS} to obtain these from a 
loaded configuration
      * @return A map where all sensitive value are hidden
      */
     @Nonnull
-    public static Map<String, String> hideSensitiveValues(Map<String, String> 
keyValuePairs) {
+    public static Map<String, String> hideSensitiveValues(
+            Map<String, String> keyValuePairs, List<String> 
additionalSensitiveKeys) {
         final HashMap<String, String> result = new HashMap<>();
 
         for (Map.Entry<String, String> keyValuePair : 
keyValuePairs.entrySet()) {
-            if (GlobalConfiguration.isSensitive(keyValuePair.getKey())) {
+            if (GlobalConfiguration.isSensitive(keyValuePair.getKey(), 
additionalSensitiveKeys)) {
                 result.put(keyValuePair.getKey(), 
GlobalConfiguration.HIDDEN_CONTENT);
             } else {
                 result.put(keyValuePair.getKey(), keyValuePair.getValue());
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
index 513ab677a99..878f7304a3c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
@@ -58,7 +58,10 @@ public final class GlobalConfiguration {
                 "token",
                 "basic-auth",
                 "jaas.config",
-                "http-headers"
+                "http-headers",
+                "access-key",
+                "access.key",
+                "accesskey"
             };
 
     // the hidden content to be displayed
@@ -151,24 +154,27 @@ public final class GlobalConfiguration {
             configuration = loadYAMLResource(yamlConfigFile);
         }
 
-        logConfiguration("Loading", configuration);
+        final List<String> additionalKeys =
+                configuration.get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS);
+        logConfiguration("Loading", configuration, additionalKeys);
 
         if (dynamicProperties != null) {
-            logConfiguration("Loading dynamic", dynamicProperties);
+            logConfiguration("Loading dynamic", dynamicProperties, 
additionalKeys);
             configuration.addAll(dynamicProperties);
         }
 
         return configuration;
     }
 
-    private static void logConfiguration(String prefix, Configuration config) {
+    private static void logConfiguration(
+            String prefix, Configuration config, List<String> additionalKeys) {
         config.confData.forEach(
                 (key, value) ->
                         LOG.info(
                                 "{} configuration property: {}, {}",
                                 prefix,
                                 key,
-                                isSensitive(key) ? HIDDEN_CONTENT : value));
+                                isSensitive(key, additionalKeys) ? 
HIDDEN_CONTENT : value));
     }
 
     /**
@@ -263,8 +269,11 @@ public final class GlobalConfiguration {
      * Check whether the key is a hidden key.
      *
      * @param key the config key
+     * @param additionalKeys user-defined additional sensitive key substrings 
to check in addition
+     *     to the built-in list; use {@link 
SecurityOptions#ADDITIONAL_SENSITIVE_KEYS} to obtain
+     *     these from a loaded {@link Configuration}
      */
-    public static boolean isSensitive(String key) {
+    public static boolean isSensitive(String key, List<String> additionalKeys) 
{
         Preconditions.checkNotNull(key, "key is null");
         final String keyInLower = key.toLowerCase();
         for (String hideKey : SENSITIVE_KEYS) {
@@ -272,6 +281,12 @@ public final class GlobalConfiguration {
                 return true;
             }
         }
+        for (String hideKey : additionalKeys) {
+            final String hideKeyLower = hideKey.toLowerCase();
+            if (keyInLower.length() >= hideKeyLower.length() && 
keyInLower.contains(hideKeyLower)) {
+                return true;
+            }
+        }
         return false;
     }
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
index 7ec79f0c80a..32d8ee7a1e6 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
@@ -649,4 +649,27 @@ public class SecurityOptions {
         checkNotNull(sslConfig, "sslConfig");
         return isRestSSLEnabled(sslConfig) && 
sslConfig.get(SSL_REST_AUTHENTICATION_ENABLED);
     }
+
+    // ------------------------------------------------------------------------
+    //  Sensitive key redaction
+    // ------------------------------------------------------------------------
+
+    /**
+     * Additional sensitive key substrings to redact beyond the built-in list.
+     *
+     * <p>Values are matched case-insensitively as substrings of configuration 
key names. The
+     * built-in sensitive key patterns are immutable and cannot be overridden 
or removed via this
+     * option.
+     */
+    public static final ConfigOption<List<String>> ADDITIONAL_SENSITIVE_KEYS =
+            key("security.redaction.additional-keys")
+                    .stringType()
+                    .asList()
+                    .defaultValues()
+                    .withDescription(
+                            "Comma-separated list of additional configuration 
key substrings whose"
+                                    + " values should be redacted in logs and 
REST API responses."
+                                    + " Matching is case-insensitive and based 
on substring containment."
+                                    + " The built-in sensitive key patterns 
are immutable and cannot"
+                                    + " be overridden via this option.");
 }
diff --git 
a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
 
b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
index c61819fa088..758be3f4019 100644
--- 
a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
@@ -27,6 +27,7 @@ import org.junit.jupiter.api.Test;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -481,7 +482,7 @@ class ConfigurationTest {
         ConfigOption<List<String>> secret =
                 
ConfigOptions.key("secret").stringType().asList().noDefaultValue();
 
-        assertThat(GlobalConfiguration.isSensitive(secret.key())).isTrue();
+        assertThat(GlobalConfiguration.isSensitive(secret.key(), 
Collections.emptyList())).isTrue();
 
         final Configuration cfg = new Configuration();
         // missing closing quote
@@ -500,7 +501,7 @@ class ConfigurationTest {
         ConfigOption<Map<String, String>> secret =
                 ConfigOptions.key("secret").mapType().noDefaultValue();
 
-        assertThat(GlobalConfiguration.isSensitive(secret.key())).isTrue();
+        assertThat(GlobalConfiguration.isSensitive(secret.key(), 
Collections.emptyList())).isTrue();
 
         final Configuration cfg = new Configuration();
         // malformed map representation
@@ -519,7 +520,7 @@ class ConfigurationTest {
         ConfigOption<Map<String, String>> secret =
                 ConfigOptions.key("secret").mapType().noDefaultValue();
 
-        assertThat(GlobalConfiguration.isSensitive(secret.key())).isTrue();
+        assertThat(GlobalConfiguration.isSensitive(secret.key(), 
Collections.emptyList())).isTrue();
 
         final Configuration cfg = new Configuration();
         cfg.setString(secret.key(), "secret_value");
diff --git 
a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java
 
b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java
index 12c1dab1def..7b4b11496ed 100644
--- 
a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java
@@ -26,6 +26,7 @@ import java.io.File;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -173,7 +174,7 @@ class ConfigurationUtilsTest {
         }
 
         final Map<String, String> hiddenSensitiveValues =
-                ConfigurationUtils.hideSensitiveValues(keyValuePairs);
+                ConfigurationUtils.hideSensitiveValues(keyValuePairs, 
Collections.emptyList());
 
         assertThat(hiddenSensitiveValues).isEqualTo(expectedKeyValuePairs);
     }
diff --git 
a/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
 
b/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
index 5f57777d264..24340dc46ac 100644
--- 
a/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
@@ -26,6 +26,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
@@ -135,29 +136,94 @@ class GlobalConfigurationTest {
 
     @Test
     void testHiddenKey() {
-        assertThat(GlobalConfiguration.isSensitive("password123")).isTrue();
-        assertThat(GlobalConfiguration.isSensitive("123pasSword")).isTrue();
-        assertThat(GlobalConfiguration.isSensitive("PasSword")).isTrue();
-        assertThat(GlobalConfiguration.isSensitive("Secret")).isTrue();
-        
assertThat(GlobalConfiguration.isSensitive("polaris.client-secret")).isTrue();
-        assertThat(GlobalConfiguration.isSensitive("client-secret")).isTrue();
-        
assertThat(GlobalConfiguration.isSensitive("service-key-json")).isTrue();
-        
assertThat(GlobalConfiguration.isSensitive("auth.basic.password")).isTrue();
-        
assertThat(GlobalConfiguration.isSensitive("auth.basic.token")).isTrue();
-        
assertThat(GlobalConfiguration.isSensitive("avro-confluent.basic-auth.user-info")).isTrue();
-        
assertThat(GlobalConfiguration.isSensitive("key.avro-confluent.basic-auth.user-info"))
-                .isTrue();
-        
assertThat(GlobalConfiguration.isSensitive("value.avro-confluent.basic-auth.user-info"))
-                .isTrue();
-        
assertThat(GlobalConfiguration.isSensitive("kafka.jaas.config")).isTrue();
-        
assertThat(GlobalConfiguration.isSensitive("properties.ssl.truststore.password")).isTrue();
-        
assertThat(GlobalConfiguration.isSensitive("properties.ssl.keystore.password")).isTrue();
+        assertThat(GlobalConfiguration.isSensitive("password123", 
Collections.emptyList()))
+                .isTrue();
+        assertThat(GlobalConfiguration.isSensitive("123pasSword", 
Collections.emptyList()))
+                .isTrue();
+        assertThat(GlobalConfiguration.isSensitive("PasSword", 
Collections.emptyList())).isTrue();
+        assertThat(GlobalConfiguration.isSensitive("Secret", 
Collections.emptyList())).isTrue();
+        assertThat(
+                        GlobalConfiguration.isSensitive(
+                                "polaris.client-secret", 
Collections.emptyList()))
+                .isTrue();
+        assertThat(GlobalConfiguration.isSensitive("client-secret", 
Collections.emptyList()))
+                .isTrue();
+        assertThat(GlobalConfiguration.isSensitive("service-key-json", 
Collections.emptyList()))
+                .isTrue();
+        assertThat(GlobalConfiguration.isSensitive("auth.basic.password", 
Collections.emptyList()))
+                .isTrue();
+        assertThat(GlobalConfiguration.isSensitive("auth.basic.token", 
Collections.emptyList()))
+                .isTrue();
+        assertThat(
+                        GlobalConfiguration.isSensitive(
+                                "avro-confluent.basic-auth.user-info", 
Collections.emptyList()))
+                .isTrue();
+        assertThat(
+                        GlobalConfiguration.isSensitive(
+                                "key.avro-confluent.basic-auth.user-info", 
Collections.emptyList()))
+                .isTrue();
+        assertThat(
+                        GlobalConfiguration.isSensitive(
+                                "value.avro-confluent.basic-auth.user-info",
+                                Collections.emptyList()))
+                .isTrue();
+        assertThat(GlobalConfiguration.isSensitive("kafka.jaas.config", 
Collections.emptyList()))
+                .isTrue();
+        assertThat(
+                        GlobalConfiguration.isSensitive(
+                                "properties.ssl.truststore.password", 
Collections.emptyList()))
+                .isTrue();
+        assertThat(
+                        GlobalConfiguration.isSensitive(
+                                "properties.ssl.keystore.password", 
Collections.emptyList()))
+                .isTrue();
+        assertThat(
+                        GlobalConfiguration.isSensitive(
+                                
"fs.azure.account.key.storageaccount123456.core.windows.net",
+                                Collections.emptyList()))
+                .isTrue();
+        assertThat(GlobalConfiguration.isSensitive("Hello", 
Collections.emptyList())).isFalse();
+        assertThat(
+                        GlobalConfiguration.isSensitive(
+                                "metrics.reporter.dghttp.apikey", 
Collections.emptyList()))
+                .isTrue();
 
+        // access-key / access.key / accesskey patterns
+        assertThat(GlobalConfiguration.isSensitive("s3.access-key", 
Collections.emptyList()))
+                .isTrue();
+        assertThat(GlobalConfiguration.isSensitive("fs.s3a.access.key", 
Collections.emptyList()))
+                .isTrue();
+        assertThat(GlobalConfiguration.isSensitive("s3.access.key", 
Collections.emptyList()))
+                .isTrue();
+        assertThat(GlobalConfiguration.isSensitive("fs.oss.accessKeyId", 
Collections.emptyList()))
+                .isTrue();
+        assertThat(GlobalConfiguration.isSensitive("fs.oss.accesskey", 
Collections.emptyList()))
+                .isTrue();
+    }
+
+    @Test
+    void testAdditionalSensitiveKeys() {
+        assertThat(
+                        GlobalConfiguration.isSensitive(
+                                "my.custom.credential",
+                                Arrays.asList("my.custom.credential", 
"VENDOR_TOKEN_ID")))
+                .isTrue();
         assertThat(
                         GlobalConfiguration.isSensitive(
-                                
"fs.azure.account.key.storageaccount123456.core.windows.net"))
+                                "prefix.my.custom.credential.suffix",
+                                Arrays.asList("my.custom.credential")))
                 .isTrue();
-        assertThat(GlobalConfiguration.isSensitive("Hello")).isFalse();
-        
assertThat(GlobalConfiguration.isSensitive("metrics.reporter.dghttp.apikey")).isTrue();
+        // case-insensitive matching
+        assertThat(
+                        GlobalConfiguration.isSensitive(
+                                "vendor_token_id", 
Arrays.asList("VENDOR_TOKEN_ID")))
+                .isTrue();
+        // built-in keys are unaffected when additional list is empty
+        assertThat(GlobalConfiguration.isSensitive("password", 
Collections.emptyList())).isTrue();
+        // unrelated key not matched
+        assertThat(
+                        GlobalConfiguration.isSensitive(
+                                "unrelated.key", 
Arrays.asList("my.custom.credential")))
+                .isFalse();
     }
 }
diff --git 
a/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java 
b/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
index 94c6db2656c..bd816b6424a 100644
--- 
a/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
+++ 
b/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.client.deployment.application.UnsuccessfulExecutionExcep
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.python.util.PythonDependencyUtils;
 import org.apache.flink.util.CompressionUtils;
@@ -342,7 +343,10 @@ final class PythonEnvUtils {
      * @throws IOException Thrown if an error occurred when python process 
start.
      */
     static Process startPythonProcess(
-            PythonEnvironment pythonEnv, List<String> commands, boolean 
redirectToPipe)
+            PythonEnvironment pythonEnv,
+            List<String> commands,
+            boolean redirectToPipe,
+            List<String> additionalSensitiveKeys)
             throws IOException {
         ProcessBuilder pythonProcessBuilder = new ProcessBuilder();
         Map<String, String> env = pythonProcessBuilder.environment();
@@ -372,7 +376,9 @@ final class PythonEnvUtils {
         }
         LOG.info(
                 "Starting Python process with environment variables: {{}}, 
command: {}",
-                ConfigurationUtils.hideSensitiveValues(env).entrySet().stream()
+                ConfigurationUtils.hideSensitiveValues(env, 
additionalSensitiveKeys)
+                        .entrySet()
+                        .stream()
                         .map(e -> e.getKey() + "=" + e.getValue())
                         .collect(Collectors.joining(", ")),
                 String.join(" ", commands));
@@ -495,7 +501,11 @@ final class PythonEnvUtils {
         pythonEnv.systemEnv.put(
                 "PYFLINK_GATEWAY_PORT", 
String.valueOf(gatewayServer.getListeningPort()));
         // start the python process.
-        return PythonEnvUtils.startPythonProcess(pythonEnv, commands, 
redirectToPipe);
+        return PythonEnvUtils.startPythonProcess(
+                pythonEnv,
+                commands,
+                redirectToPipe,
+                config.get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS));
     }
 
     public static void setPythonException(Throwable pythonException) {
diff --git 
a/flink-python/src/test/java/org/apache/flink/client/python/PythonEnvUtilsTest.java
 
b/flink-python/src/test/java/org/apache/flink/client/python/PythonEnvUtilsTest.java
index 7f45d4e3bc5..cf79a9a377d 100644
--- 
a/flink-python/src/test/java/org/apache/flink/client/python/PythonEnvUtilsTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/client/python/PythonEnvUtilsTest.java
@@ -38,6 +38,7 @@ import java.nio.file.Paths;
 import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -160,7 +161,9 @@ class PythonEnvUtilsTest {
             String result = String.join(File.separator, tmpDirPath, 
"python_working_directory.txt");
             commands.add(pyPath);
             commands.add(result);
-            Process pythonProcess = 
PythonEnvUtils.startPythonProcess(pythonEnv, commands, false);
+            Process pythonProcess =
+                    PythonEnvUtils.startPythonProcess(
+                            pythonEnv, commands, false, 
Collections.emptyList());
             int exitCode = pythonProcess.waitFor();
             if (exitCode != 0) {
                 throw new RuntimeException("Python process exits with code: " 
+ exitCode);
diff --git 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java
 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java
index a206e4ef08b..9d6ff5e8b15 100644
--- 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java
+++ 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
 
 import java.io.IOException;
 import java.net.BindException;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Optional;
@@ -257,7 +258,8 @@ public class ActorSystemBootstrapTools {
                         .collect(
                                 Collectors.toMap(
                                         Map.Entry::getKey,
-                                        entry -> 
String.valueOf(entry.getValue().unwrapped()))));
+                                        entry -> 
String.valueOf(entry.getValue().unwrapped()))),
+                Collections.emptyList());
     }
 
     /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
index 649f3cee13e..e8650e1ca56 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
@@ -37,6 +37,7 @@ import java.io.IOException;
 import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
 
@@ -45,13 +46,16 @@ public class JobConfigHandler
         extends AbstractAccessExecutionGraphHandler<JobConfigInfo, 
JobMessageParameters>
         implements OnlyExecutionGraphJsonArchivist {
 
+    private final List<String> additionalSensitiveKeys;
+
     public JobConfigHandler(
             GatewayRetriever<? extends RestfulGateway> leaderRetriever,
             Duration timeout,
             Map<String, String> responseHeaders,
             MessageHeaders<EmptyRequestBody, JobConfigInfo, 
JobMessageParameters> messageHeaders,
             ExecutionGraphCache executionGraphCache,
-            Executor executor) {
+            Executor executor,
+            List<String> additionalSensitiveKeys) {
 
         super(
                 leaderRetriever,
@@ -60,6 +64,7 @@ public class JobConfigHandler
                 messageHeaders,
                 executionGraphCache,
                 executor);
+        this.additionalSensitiveKeys = additionalSensitiveKeys;
     }
 
     @Override
@@ -79,12 +84,14 @@ public class JobConfigHandler
         return Collections.singleton(new ArchivedJson(path, json));
     }
 
-    private static JobConfigInfo createJobConfigInfo(AccessExecutionGraph 
executionGraph) {
+    private JobConfigInfo createJobConfigInfo(AccessExecutionGraph 
executionGraph) {
         final ArchivedExecutionConfig executionConfig = 
executionGraph.getArchivedExecutionConfig();
         final JobConfigInfo.ExecutionConfigInfo executionConfigInfo;
 
         if (executionConfig != null) {
-            executionConfigInfo = 
JobConfigInfo.ExecutionConfigInfo.from(executionConfig);
+            executionConfigInfo =
+                    JobConfigInfo.ExecutionConfigInfo.from(
+                            executionConfig, additionalSensitiveKeys);
         } else {
             executionConfigInfo = null;
         }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ConfigurationInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ConfigurationInfo.java
index e0dcd23ae47..82abe13c6f5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ConfigurationInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ConfigurationInfo.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.messages;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.rest.handler.cluster.ClusterConfigHandler;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
@@ -51,7 +52,8 @@ public class ConfigurationInfo extends 
ArrayList<ConfigurationInfoEntry> impleme
     public static ConfigurationInfo from(Configuration config) {
         final ConfigurationInfo clusterConfig = new 
ConfigurationInfo(config.keySet().size());
         final Map<String, String> configurationWithHiddenSensitiveValues =
-                ConfigurationUtils.hideSensitiveValues(config.toMap());
+                ConfigurationUtils.hideSensitiveValues(
+                        config.toMap(), 
config.get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS));
 
         for (Map.Entry<String, String> keyValuePair :
                 configurationWithHiddenSensitiveValues.entrySet()) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobConfigInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobConfigInfo.java
index 77d007cf2a2..80d62bab8d2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobConfigInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobConfigInfo.java
@@ -41,6 +41,7 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
@@ -235,13 +236,16 @@ public class JobConfigInfo implements ResponseBody {
             return Objects.hash(restartStrategy, parallelism, isObjectReuse, 
globalJobParameters);
         }
 
-        public static ExecutionConfigInfo from(ArchivedExecutionConfig 
archivedExecutionConfig) {
+        public static ExecutionConfigInfo from(
+                ArchivedExecutionConfig archivedExecutionConfig,
+                List<String> additionalSensitiveKeys) {
             return new ExecutionConfigInfo(
                     archivedExecutionConfig.getRestartStrategyDescription(),
                     archivedExecutionConfig.getParallelism(),
                     archivedExecutionConfig.getObjectReuseEnabled(),
                     ConfigurationUtils.hideSensitiveValues(
-                            archivedExecutionConfig.getGlobalJobParameters()));
+                            archivedExecutionConfig.getGlobalJobParameters(),
+                            additionalSensitiveKeys));
         }
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
index eb17d223540..96c9704e2e8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
@@ -34,6 +34,7 @@ import java.time.Instant;
 import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
 import java.time.format.DateTimeParseException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 
@@ -466,7 +467,7 @@ public class EnvironmentInformation {
             } else {
                 log.info(" Program Arguments:");
                 for (String s : commandLineArgs) {
-                    if (GlobalConfiguration.isSensitive(s)) {
+                    if (GlobalConfiguration.isSensitive(s, 
Collections.emptyList())) {
                         log.info(
                                 "    "
                                         + GlobalConfiguration.HIDDEN_CONTENT
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 9e3f295e6e7..16cf06d4dd9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.RpcOptions;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.application.ArchivedApplication;
 import org.apache.flink.runtime.blob.TransientBlobService;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
@@ -402,7 +403,8 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                         responseHeaders,
                         JobConfigHeaders.getInstance(),
                         executionGraphCache,
-                        executor);
+                        executor,
+                        
clusterConfiguration.get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS));
 
         JobManagerJobConfigurationHandler jobManagerJobConfigurationHandler =
                 new JobManagerJobConfigurationHandler(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandlerTest.java
index c2d2968e5ed..b0a4e69f87f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandlerTest.java
@@ -56,7 +56,8 @@ class JobConfigHandlerTest {
                         Collections.emptyMap(),
                         JobConfigHeaders.getInstance(),
                         new DefaultExecutionGraphCache(TestingUtils.TIMEOUT, 
TestingUtils.TIMEOUT),
-                        Executors.directExecutor());
+                        Executors.directExecutor(),
+                        Collections.emptyList());
 
         final Map<String, String> globalJobParameters = new HashMap<>();
         globalJobParameters.put("foobar", "barfoo");
@@ -85,7 +86,7 @@ class JobConfigHandlerTest {
     }
 
     private Map<String, String> filterSecretValues(Map<String, String> 
globalJobParameters) {
-        return ConfigurationUtils.hideSensitiveValues(globalJobParameters);
+        return ConfigurationUtils.hideSensitiveValues(globalJobParameters, 
Collections.emptyList());
     }
 
     private HandlerRequest<EmptyRequestBody> createRequest(JobID jobId)
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
index 594a7bf1d5d..a61da0902ae 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.DelegatingConfiguration;
 import org.apache.flink.configuration.FallbackKey;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Catalog;
@@ -248,7 +249,14 @@ public final class FactoryUtil {
                                     + "%s",
                             objectIdentifier.asSummaryString(),
                             catalogTable.getOptions().entrySet().stream()
-                                    .map(e -> stringifyOption(e.getKey(), 
e.getValue()))
+                                    .map(
+                                            e ->
+                                                    stringifyOption(
+                                                            e.getKey(),
+                                                            e.getValue(),
+                                                            configuration.get(
+                                                                    
SecurityOptions
+                                                                            
.ADDITIONAL_SENSITIVE_KEYS)))
                                     .sorted()
                                     .collect(Collectors.joining("\n"))),
                     t);
@@ -294,7 +302,14 @@ public final class FactoryUtil {
                                     + "%s",
                             objectIdentifier.asSummaryString(),
                             catalogTable.getOptions().entrySet().stream()
-                                    .map(e -> stringifyOption(e.getKey(), 
e.getValue()))
+                                    .map(
+                                            e ->
+                                                    stringifyOption(
+                                                            e.getKey(),
+                                                            e.getValue(),
+                                                            configuration.get(
+                                                                    
SecurityOptions
+                                                                            
.ADDITIONAL_SENSITIVE_KEYS)))
                                     .sorted()
                                     .collect(Collectors.joining("\n"))),
                     t);
@@ -334,7 +349,14 @@ public final class FactoryUtil {
                                     + "%s",
                             objectIdentifier.asSummaryString(),
                             catalogModel.getOptions().entrySet().stream()
-                                    .map(e -> stringifyOption(e.getKey(), 
e.getValue()))
+                                    .map(
+                                            e ->
+                                                    stringifyOption(
+                                                            e.getKey(),
+                                                            e.getValue(),
+                                                            configuration.get(
+                                                                    
SecurityOptions
+                                                                            
.ADDITIONAL_SENSITIVE_KEYS)))
                                     .sorted()
                                     .collect(Collectors.joining("\n"))),
                     t);
@@ -460,7 +482,10 @@ public final class FactoryUtil {
                                             optionEntry ->
                                                     stringifyOption(
                                                             
optionEntry.getKey(),
-                                                            
optionEntry.getValue()))
+                                                            
optionEntry.getValue(),
+                                                            configuration.get(
+                                                                    
SecurityOptions
+                                                                            
.ADDITIONAL_SENSITIVE_KEYS)))
                                     .sorted()
                                     .collect(Collectors.joining("\n"))),
                     t);
@@ -509,7 +534,10 @@ public final class FactoryUtil {
                                             optionEntry ->
                                                     stringifyOption(
                                                             
optionEntry.getKey(),
-                                                            
optionEntry.getValue()))
+                                                            
optionEntry.getValue(),
+                                                            configuration.get(
+                                                                    
SecurityOptions
+                                                                            
.ADDITIONAL_SENSITIVE_KEYS)))
                                     .sorted()
                                     .collect(Collectors.joining("\n"))),
                     t);
@@ -756,7 +784,11 @@ public final class FactoryUtil {
             return new ValidationException(
                     String.format(
                             "Cannot discover a connector using option: %s",
-                            stringifyOption(CONNECTOR.key(), connectorOption)),
+                            stringifyOption(
+                                    CONNECTOR.key(),
+                                    connectorOption,
+                                    context.getConfiguration()
+                                            
.get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS))),
                     e);
         }
 
@@ -818,8 +850,9 @@ public final class FactoryUtil {
         return loadResults;
     }
 
-    public static String stringifyOption(String key, String value) {
-        if (GlobalConfiguration.isSensitive(key)) {
+    public static String stringifyOption(
+            String key, String value, List<String> additionalSensitiveKeys) {
+        if (GlobalConfiguration.isSensitive(key, additionalSensitiveKeys)) {
             value = HIDDEN_CONTENT;
         }
         return String.format(
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/WorkflowSchedulerFactoryUtil.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/WorkflowSchedulerFactoryUtil.java
index e8d1a6333f6..91321a0c1e5 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/WorkflowSchedulerFactoryUtil.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/WorkflowSchedulerFactoryUtil.java
@@ -23,6 +23,7 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DelegatingConfiguration;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.workflow.WorkflowScheduler;
 import org.apache.flink.util.StringUtils;
@@ -84,7 +85,10 @@ public class WorkflowSchedulerFactoryUtil {
                                             optionEntry ->
                                                     stringifyOption(
                                                             
optionEntry.getKey(),
-                                                            
optionEntry.getValue()))
+                                                            
optionEntry.getValue(),
+                                                            configuration.get(
+                                                                    
SecurityOptions
+                                                                            
.ADDITIONAL_SENSITIVE_KEYS)))
                                     .sorted()
                                     .collect(Collectors.joining("\n"))),
                     t);
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 66fe7b1b5ea..84cad5d79eb 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -784,7 +784,8 @@ public class FlinkYarnSessionCli extends AbstractYarnCli {
                                     LOG.info(
                                             "Dynamic Property set: {}={}",
                                             key,
-                                            
GlobalConfiguration.isSensitive(key)
+                                            GlobalConfiguration.isSensitive(
+                                                            key, 
Collections.emptyList())
                                                     ? 
GlobalConfiguration.HIDDEN_CONTENT
                                                     : value);
 

Reply via email to