This is an automated email from the ASF dual-hosted git repository.
gaborgsomogyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f008d0ee0ab [FLINK-39685][table] Redact sensitive options in SHOW
CREATE and DESCRIBE CATALOG statements
f008d0ee0ab is described below
commit f008d0ee0ab9c066d40115879ae601523a1c2615
Author: Gabor Somogyi <[email protected]>
AuthorDate: Mon May 18 17:19:52 2026 +0200
[FLINK-39685][table] Redact sensitive options in SHOW CREATE and DESCRIBE
CATALOG statements
---
.../flink/table/api/internal/ShowCreateUtil.java | 67 ++++++-------
.../table/operations/DescribeCatalogOperation.java | 19 +++-
.../operations/ShowCreateCatalogOperation.java | 6 +-
.../ShowCreateMaterializedTableOperation.java | 4 +-
.../table/operations/ShowCreateModelOperation.java | 7 +-
.../table/operations/ShowCreateTableOperation.java | 4 +-
.../table/api/internal/ShowCreateUtilTest.java | 107 ++++++++++++++++++++-
.../flink/table/catalog/DefaultCatalogModel.java | 4 +-
.../flink/table/catalog/DefaultCatalogTable.java | 3 +-
.../table/catalog/DefaultCatalogTableTest.java | 84 ++++++++++++++++
.../table/planner/catalog/DescribeCatalogTest.java | 81 ++++++++++++++++
11 files changed, 334 insertions(+), 52 deletions(-)
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
index dfc9fef79c5..9ea15b1eca8 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.api.internal;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
@@ -67,7 +68,10 @@ public class ShowCreateUtil {
private ShowCreateUtil() {}
public static String buildShowCreateModelRow(
- ResolvedCatalogModel model, ObjectIdentifier modelIdentifier,
boolean isTemporary) {
+ ResolvedCatalogModel model,
+ ObjectIdentifier modelIdentifier,
+ boolean isTemporary,
+ List<String> additionalSensitiveKeys) {
StringBuilder sb =
new StringBuilder()
.append(
@@ -81,7 +85,7 @@ public class ShowCreateUtil {
c -> sb.append(String.format("OUTPUT (%s)%s", c,
System.lineSeparator())));
extractComment(model)
.ifPresent(c ->
sb.append(formatComment(c)).append(System.lineSeparator()));
- extractFormattedOptions(model.getOptions(), PRINT_INDENT)
+ extractFormattedOptions(model.getOptions(), PRINT_INDENT,
additionalSensitiveKeys)
.ifPresent(
v ->
sb.append(String.format("WITH (%s",
System.lineSeparator()))
@@ -98,7 +102,8 @@ public class ShowCreateUtil {
ResolvedCatalogBaseTable<?> table,
ObjectIdentifier tableIdentifier,
boolean isTemporary,
- SqlFactory sqlFactory) {
+ SqlFactory sqlFactory,
+ List<String> additionalSensitiveKeys) {
validateTableKind(table, tableIdentifier, TableKind.TABLE);
StringBuilder sb =
new StringBuilder()
@@ -118,7 +123,7 @@ public class ShowCreateUtil {
.ifPresent(
partitionedInfoFormatted ->
sb.append(formatPartitionedBy(partitionedInfoFormatted)));
- extractFormattedOptions(table.getOptions(), PRINT_INDENT)
+ extractFormattedOptions(table.getOptions(), PRINT_INDENT,
additionalSensitiveKeys)
.ifPresent(v -> sb.append("WITH
(\n").append(v).append("\n)\n"));
return sb.toString();
}
@@ -130,7 +135,8 @@ public class ShowCreateUtil {
boolean isTemporary,
boolean createOrAlter,
ZoneId timeZoneId,
- SqlFactory sqlFactory) {
+ SqlFactory sqlFactory,
+ List<String> additionalSensitiveKeys) {
return buildShowCreateMaterializedTableRow(
table,
tableIdentifier,
@@ -139,7 +145,8 @@ public class ShowCreateUtil {
timeZoneId,
sqlFactory,
true,
- true);
+ true,
+ additionalSensitiveKeys);
}
/** Show create materialized table statement only for materialized tables.
*/
@@ -151,7 +158,8 @@ public class ShowCreateUtil {
ZoneId timeZoneId,
SqlFactory sqlFactory,
boolean includeFreshness,
- boolean includeRefreshMode) {
+ boolean includeRefreshMode,
+ List<String> additionalSensitiveKeys) {
validateTableKind(table, tableIdentifier,
TableKind.MATERIALIZED_TABLE);
StringBuilder sb =
new StringBuilder()
@@ -174,7 +182,7 @@ public class ShowCreateUtil {
.ifPresent(d -> sb.append(d).append("\n"));
extractFormattedPartitionedInfo(table)
.ifPresent(partitionedBy ->
sb.append(formatPartitionedBy(partitionedBy)));
- extractFormattedOptions(table.getOptions(), PRINT_INDENT)
+ extractFormattedOptions(table.getOptions(), PRINT_INDENT,
additionalSensitiveKeys)
.ifPresent(v -> sb.append("WITH
(\n").append(v).append("\n)\n"));
sb.append(extractStartMode(table, timeZoneId)).append("\n");
if (includeFreshness) {
@@ -211,14 +219,18 @@ public class ShowCreateUtil {
return sb.toString();
}
- public static String buildShowCreateCatalogRow(CatalogDescriptor
catalogDescriptor) {
+ public static String buildShowCreateCatalogRow(
+ CatalogDescriptor catalogDescriptor, List<String>
additionalSensitiveKeys) {
final Optional<String> comment = catalogDescriptor.getComment();
StringBuilder sb = new StringBuilder();
sb.append("CREATE CATALOG ")
.append(EncodingUtils.escapeIdentifier(catalogDescriptor.getCatalogName()))
.append("\n");
comment.ifPresent(c -> sb.append(formatComment(c)).append("\n"));
- extractFormattedOptions(catalogDescriptor.getConfiguration().toMap(),
PRINT_INDENT)
+ extractFormattedOptions(
+ catalogDescriptor.getConfiguration().toMap(),
+ PRINT_INDENT,
+ additionalSensitiveKeys)
.ifPresent(o -> sb.append("WITH
(\n").append(o).append("\n)\n"));
return sb.toString();
}
@@ -432,7 +444,8 @@ public class ShowCreateUtil {
return TIMESTAMP_FORMATTER.format(LocalDateTime.ofInstant(instant,
timeZone));
}
- static Optional<String> extractFormattedOptions(Map<String, String> conf,
String printIndent) {
+ static Optional<String> extractFormattedOptions(
+ Map<String, String> conf, String printIndent, List<String>
additionalSensitiveKeys) {
if (Objects.isNull(conf) || conf.isEmpty()) {
return Optional.empty();
}
@@ -446,7 +459,12 @@ public class ShowCreateUtil {
"%s'%s' = '%s'",
printIndent,
EncodingUtils.escapeSingleQuotes(entry),
-
EncodingUtils.escapeSingleQuotes(conf.get(entry))))
+
EncodingUtils.escapeSingleQuotes(
+
GlobalConfiguration.isSensitive(
+ entry,
+
additionalSensitiveKeys)
+ ?
GlobalConfiguration.HIDDEN_CONTENT
+ :
conf.get(entry))))
.collect(Collectors.joining(",\n")));
}
@@ -462,31 +480,6 @@ public class ShowCreateUtil {
.collect(Collectors.joining(",\n"));
}
- private static String maybeLowerCaseKey(String key, boolean lowerCaseKey) {
- return lowerCaseKey ? key.toLowerCase() : key;
- }
-
- static Optional<String> extractFormattedOptions(
- Map<String, String> conf, String printIndent, boolean
lowerCaseKeys) {
- if (Objects.isNull(conf) || conf.isEmpty()) {
- return Optional.empty();
- }
- return Optional.of(
- conf.entrySet().stream()
- .map(
- entry ->
- String.format(
- "%s'%s' = '%s'",
- printIndent,
- maybeLowerCaseKey(
-
EncodingUtils.escapeSingleQuotes(
-
entry.getKey()),
- lowerCaseKeys),
-
EncodingUtils.escapeSingleQuotes(entry.getValue())))
- .sorted()
- .collect(Collectors.joining("," +
System.lineSeparator())));
- }
-
private static void validateTableKind(
ResolvedCatalogBaseTable<?> table,
ObjectIdentifier tableIdentifier,
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeCatalogOperation.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeCatalogOperation.java
index 08a97d10bec..9d1990fe67d 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeCatalogOperation.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeCatalogOperation.java
@@ -19,6 +19,8 @@
package org.apache.flink.table.operations;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.TableResultInternal;
@@ -87,17 +89,24 @@ public class DescribeCatalogOperation implements Operation,
ExecutableOperation
Arrays.asList(
"comment",
catalogDescriptor.getComment().orElse(null))));
if (isExtended) {
+ List<String> additionalSensitiveKeys =
+
ctx.getTableConfig().get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS);
properties.entrySet().stream()
.filter(
entry ->
!CommonCatalogOptions.CATALOG_TYPE.key().equals(entry.getKey()))
.sorted(Map.Entry.comparingByKey())
.forEach(
- entry ->
- rows.add(
- Arrays.asList(
- String.format("option:%s",
entry.getKey()),
- entry.getValue())));
+ entry -> {
+ String value =
+ GlobalConfiguration.isSensitive(
+ entry.getKey(),
additionalSensitiveKeys)
+ ?
GlobalConfiguration.HIDDEN_CONTENT
+ : entry.getValue();
+ rows.add(
+ Arrays.asList(
+ String.format("option:%s",
entry.getKey()), value));
+ });
}
return buildTableResult(
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateCatalogOperation.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateCatalogOperation.java
index 60baecb40a8..fca7b39a5d9 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateCatalogOperation.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateCatalogOperation.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.operations;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.ShowCreateUtil;
import org.apache.flink.table.api.internal.TableResultInternal;
@@ -57,7 +58,10 @@ public class ShowCreateCatalogOperation implements
ShowOperation {
String.format(
"Cannot obtain
metadata information from Catalog %s.",
catalogName)));
- String resultRow =
ShowCreateUtil.buildShowCreateCatalogRow(catalogDescriptor);
+ String resultRow =
+ ShowCreateUtil.buildShowCreateCatalogRow(
+ catalogDescriptor,
+
ctx.getTableConfig().get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS));
return buildStringArrayResult("result", new String[] {resultRow});
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateMaterializedTableOperation.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateMaterializedTableOperation.java
index 0ca8a1475ce..1165c44e4bb 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateMaterializedTableOperation.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateMaterializedTableOperation.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.operations;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.ShowCreateUtil;
import org.apache.flink.table.api.internal.TableResultInternal;
@@ -65,7 +66,8 @@ public class ShowCreateMaterializedTableOperation implements
ShowOperation {
table.isTemporary(),
createOrAlter,
ctx.getTableConfig().getLocalTimeZone(),
- ctx.getCatalogManager().getSqlFactory());
+ ctx.getCatalogManager().getSqlFactory(),
+
ctx.getTableConfig().get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS));
return buildStringArrayResult("result", new String[] {resultRow});
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateModelOperation.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateModelOperation.java
index d94471ba0c3..ea0bd14174a 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateModelOperation.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateModelOperation.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.operations;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.table.api.internal.ShowCreateUtil;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -53,7 +54,11 @@ public class ShowCreateModelOperation implements
ShowOperation {
@Override
public TableResultInternal execute(Context ctx) {
String resultRow =
- ShowCreateUtil.buildShowCreateModelRow(model, modelIdentifier,
isTemporary);
+ ShowCreateUtil.buildShowCreateModelRow(
+ model,
+ modelIdentifier,
+ isTemporary,
+
ctx.getTableConfig().get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS));
return buildStringArrayResult("result", new String[] {resultRow});
}
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateTableOperation.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateTableOperation.java
index 6608ff263b9..d0144f7abb5 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateTableOperation.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateTableOperation.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.operations;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.ShowCreateUtil;
import org.apache.flink.table.api.internal.TableResultInternal;
@@ -62,7 +63,8 @@ public class ShowCreateTableOperation implements
ShowOperation {
table.getResolvedTable(),
tableIdentifier,
table.isTemporary(),
- ctx.getCatalogManager().getSqlFactory());
+ ctx.getCatalogManager().getSqlFactory(),
+
ctx.getTableConfig().get(SecurityOptions.ADDITIONAL_SENSITIVE_KEYS));
return buildStringArrayResult("result", new String[] {resultRow});
}
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
index 69df0555728..750818181d4 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
@@ -45,6 +45,7 @@ import org.apache.flink.table.catalog.TableDistribution;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.expressions.DefaultSqlFactory;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
@@ -112,7 +113,8 @@ class ShowCreateUtilTest {
resolvedCatalogTable,
TABLE_IDENTIFIER,
isTemporary,
- DefaultSqlFactory.INSTANCE);
+ DefaultSqlFactory.INSTANCE,
+ List.of());
assertThat(createTableString).isEqualTo(expected);
}
@@ -139,7 +141,8 @@ class ShowCreateUtilTest {
false,
createOrAlter,
ZoneOffset.UTC,
- DefaultSqlFactory.INSTANCE);
+ DefaultSqlFactory.INSTANCE,
+ List.of());
final String fixedTimestamp = "1970-01-02 12:34:56";
final String normalizedMTString =
setFixedTimestamp(createMaterializedTableString,
fixedTimestamp);
@@ -171,7 +174,8 @@ class ShowCreateUtilTest {
ZoneOffset.UTC,
DefaultSqlFactory.INSTANCE,
includeFreshness,
- includeRefreshMode);
+ includeRefreshMode,
+ List.of());
final StringBuilder expected =
new StringBuilder()
@@ -195,7 +199,7 @@ class ShowCreateUtilTest {
@MethodSource("argsForShowCreateCatalog")
void showCreateCatalog(CatalogDescriptor catalogDescriptor, String
expected) {
final String createCatalogString =
- ShowCreateUtil.buildShowCreateCatalogRow(catalogDescriptor);
+ ShowCreateUtil.buildShowCreateCatalogRow(catalogDescriptor,
List.of());
assertThat(createCatalogString).isEqualTo(expected);
}
@@ -228,6 +232,19 @@ class ShowCreateUtilTest {
+ " 'k_c' = 'v_c'\n"
+ ")\n"));
+ final Map<String, String> sensitiveCatalogOptions = new HashMap<>();
+ sensitiveCatalogOptions.put("type", "hive");
+ sensitiveCatalogOptions.put("hive.metastore.token", "tok123");
+ argList.add(
+ Arguments.of(
+ CatalogDescriptor.of(
+ "catalogName",
Configuration.fromMap(sensitiveCatalogOptions)),
+ "CREATE CATALOG `catalogName`\n"
+ + "WITH (\n"
+ + " 'hive.metastore.token' = '******',\n"
+ + " 'type' = 'hive'\n"
+ + ")\n"));
+
return argList;
}
@@ -354,6 +371,22 @@ class ShowCreateUtilTest {
+ " 'option_key_b' = 'option_value_b',\n"
+ " 'option_key_c' = 'option_value_c'\n"
+ ")\n");
+
+ final Map<String, String> sensitiveOptions = new HashMap<>();
+ sensitiveOptions.put("connector", "kafka");
+ sensitiveOptions.put("my.api-key", "abc123");
+ sensitiveOptions.put("password", "topsecret");
+ addTemporaryAndPermanent(
+ argList,
+ createResolvedTable(ONE_COLUMN_SCHEMA, sensitiveOptions,
List.of(), null, null),
+ "CREATE %sTABLE `catalogName`.`dbName`.`tableName` (\n"
+ + " `id` INT\n"
+ + ")\n"
+ + "WITH (\n"
+ + " 'connector' = 'kafka',\n"
+ + " 'my.api-key' = '******',\n"
+ + " 'password' = '******'\n"
+ + ")\n");
return argList;
}
@@ -477,9 +510,51 @@ class ShowCreateUtilTest {
+ "REFRESH_MODE = FULL\n"
+ "AS SELECT id, name FROM
`catalogName`.`dbName`.`tbl_a`\n");
+ addCreateAndCreateOrAlter(
+ argList,
+ createResolvedMaterializedWithOptions(
+ ONE_COLUMN_SCHEMA,
+ Map.of("connector", "kafka", "secret.key", "mysecret"),
+ StartMode.of(StartModeKind.FROM_BEGINNING),
+ IntervalFreshness.ofMinute(2),
+ RefreshMode.CONTINUOUS,
+ "SELECT 1"),
+ "%sMATERIALIZED TABLE
`catalogName`.`dbName`.`materializedTableName` (\n"
+ + " `id` INT\n"
+ + ")\n"
+ + "WITH (\n"
+ + " 'connector' = 'kafka',\n"
+ + " 'secret.key' = '******'\n"
+ + ")\n"
+ + "START_MODE = FROM_BEGINNING\n"
+ + "FRESHNESS = INTERVAL '2' MINUTE\n"
+ + "REFRESH_MODE = CONTINUOUS\n"
+ + "AS SELECT 1\n");
+
return argList;
}
+ @Test
+ void showCreateTableRedactsAdditionalSensitiveKeys() {
+ Map<String, String> options = new HashMap<>();
+ options.put("connector", "kafka");
+ options.put("my.custom.cred", "supersecret");
+ ResolvedCatalogTable table =
+ createResolvedTable(ONE_COLUMN_SCHEMA, options, List.of(),
null, null);
+
+ String result =
+ ShowCreateUtil.buildShowCreateTableRow(
+ table,
+ TABLE_IDENTIFIER,
+ false,
+ DefaultSqlFactory.INSTANCE,
+ List.of("custom.cred"));
+
+ assertThat(result).contains("'connector' = 'kafka'");
+ assertThat(result).contains("'my.custom.cred' = '******'");
+ assertThat(result).doesNotContain("supersecret");
+ }
+
private static void addTemporaryAndPermanent(
Collection<Arguments> argList, CatalogBaseTable catalogBaseTable,
String sql) {
argList.add(Arguments.of(catalogBaseTable, false, String.format(sql,
"")));
@@ -554,6 +629,30 @@ class ShowCreateUtilTest {
startMode);
}
+ private static ResolvedCatalogMaterializedTable
createResolvedMaterializedWithOptions(
+ ResolvedSchema resolvedSchema,
+ Map<String, String> options,
+ StartMode startMode,
+ IntervalFreshness freshness,
+ RefreshMode refreshMode,
+ String query) {
+ return new ResolvedCatalogMaterializedTable(
+ CatalogMaterializedTable.newBuilder()
+
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build())
+ .options(options)
+ .freshness(freshness)
+ .refreshMode(refreshMode)
+ .originalQuery(query)
+ .expandedQuery(query)
+ .logicalRefreshMode(LogicalRefreshMode.AUTOMATIC)
+ .refreshStatus(RefreshStatus.ACTIVATED)
+ .build(),
+ resolvedSchema,
+ refreshMode,
+ freshness,
+ startMode);
+ }
+
private static String setFixedTimestamp(String sql, String fixedTimestamp)
{
return START_MODE_EVALUATED_TIMESTAMP
.matcher(sql)
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogModel.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogModel.java
index cc39813ec13..e6b38f8ff94 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogModel.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogModel.java
@@ -19,10 +19,12 @@
package org.apache.flink.table.catalog;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.table.api.Schema;
import javax.annotation.Nullable;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -107,7 +109,7 @@ public class DefaultCatalogModel implements CatalogModel {
+ ", outputSchema="
+ outputSchema
+ ", modelOptions="
- + modelOptions
+ + ConfigurationUtils.hideSensitiveValues(modelOptions,
List.of())
+ ", comment="
+ comment
+ "}";
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogTable.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogTable.java
index b06e6acee26..f94c2399acc 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogTable.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogTable.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.catalog;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.table.api.Schema;
import javax.annotation.Nullable;
@@ -162,7 +163,7 @@ public class DefaultCatalogTable implements CatalogTable {
+ ", partitionKeys="
+ partitionKeys
+ ", options="
- + options
+ + ConfigurationUtils.hideSensitiveValues(options, List.of())
+ ", snapshot="
+ snapshot
+ '}';
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/DefaultCatalogTableTest.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/DefaultCatalogTableTest.java
new file mode 100644
index 00000000000..03301ba5cd6
--- /dev/null
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/DefaultCatalogTableTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link DefaultCatalogTable}. */
+class DefaultCatalogTableTest {
+
+ @Test
+ void toStringRedactsSensitiveOptions() {
+ Map<String, String> options = new HashMap<>();
+ options.put("connector", "kafka");
+ options.put("password", "topsecret");
+ options.put("my.token", "tok123");
+
+ CatalogTable table =
+ CatalogTable.newBuilder()
+ .schema(Schema.newBuilder().column("id",
DataTypes.INT()).build())
+ .options(options)
+ .build();
+
+ String result = table.toString();
+
+ assertThat(result).contains("connector=kafka");
+ assertThat(result).doesNotContain("topsecret");
+ assertThat(result).doesNotContain("tok123");
+ assertThat(result).contains("password=******");
+ assertThat(result).contains("my.token=******");
+ }
+
+ @Test
+ void toStringKeepsNonSensitiveOptions() {
+ Map<String, String> options = new HashMap<>();
+ options.put("connector", "kafka");
+ options.put("topic", "my-topic");
+
+ CatalogTable table =
+ CatalogTable.newBuilder()
+ .schema(Schema.newBuilder().column("id",
DataTypes.INT()).build())
+ .options(options)
+ .build();
+
+ String result = table.toString();
+
+ assertThat(result).contains("connector=kafka");
+ assertThat(result).contains("topic=my-topic");
+ }
+
+ @Test
+ void toStringWithEmptyOptionsDoesNotFail() {
+ CatalogTable table =
+ CatalogTable.newBuilder()
+ .schema(Schema.newBuilder().column("id",
DataTypes.INT()).build())
+ .options(Map.of())
+ .build();
+
+ assertThat(table.toString()).contains("options={}");
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/DescribeCatalogTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/DescribeCatalogTest.java
new file mode 100644
index 00000000000..e27e18813f4
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/DescribeCatalogTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.catalog;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.CatalogDescriptor;
+import org.apache.flink.table.catalog.GenericInMemoryCatalogStore;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@code DESCRIBE CATALOG} SQL statement, focusing on
sensitive-option redaction. */
+class DescribeCatalogTest {
+
+ private static final String CATALOG_NAME = "test_cat";
+
+ @Test
+ void describeCatalogExtendedRedactsSensitiveOptions() throws Exception {
+ TableEnvironment tEnv = buildEnvWithSensitiveOptions();
+
+ List<Row> rows =
+ CollectionUtil.iteratorToList(
+ tEnv.executeSql("DESCRIBE CATALOG EXTENDED " +
CATALOG_NAME).collect());
+
+ assertThat(rows)
+ .contains(
+ Row.of("option:password",
GlobalConfiguration.HIDDEN_CONTENT),
+ Row.of("option:my.token",
GlobalConfiguration.HIDDEN_CONTENT),
+ Row.of("option:safe-option", "safe-value"))
+ .noneMatch(r -> "topsecret".equals(r.getField(1)))
+ .noneMatch(r -> "tok123".equals(r.getField(1)));
+ }
+
+ @Test
+ void describeCatalogNonExtendedDoesNotExposeOptions() throws Exception {
+ TableEnvironment tEnv = buildEnvWithSensitiveOptions();
+
+ List<Row> rows =
+ CollectionUtil.iteratorToList(
+ tEnv.executeSql("DESCRIBE CATALOG " +
CATALOG_NAME).collect());
+
+ assertThat(rows).noneMatch(r ->
String.valueOf(r.getField(0)).startsWith("option:"));
+ }
+
+ private static TableEnvironment buildEnvWithSensitiveOptions() throws
Exception {
+ GenericInMemoryCatalogStore catalogStore = new
GenericInMemoryCatalogStore();
+ catalogStore.open();
+ Configuration config = new Configuration();
+ config.setString("type", "generic_in_memory");
+ config.setString("safe-option", "safe-value");
+ config.setString("password", "topsecret");
+ config.setString("my.token", "tok123");
+ catalogStore.storeCatalog(CATALOG_NAME,
CatalogDescriptor.of(CATALOG_NAME, config));
+ return TableEnvironment.create(
+
EnvironmentSettings.newInstance().withCatalogStore(catalogStore).build());
+ }
+}