This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 423e63a88 [flink] Refactor CALL procedure "sys.xxx_cluster_config" to 
support multiple conifg keys (#2279)
423e63a88 is described below

commit 423e63a88d9589de5484c17c2fc30bc3502c8cf2
Author: Pei Yu <[email protected]>
AuthorDate: Thu Jan 8 19:05:17 2026 +0800

    [flink] Refactor CALL procedure "sys.xxx_cluster_config" to support 
multiple conifg keys (#2279)
---
 ...cedure.java => GetClusterConfigsProcedure.java} |  39 +++--
 .../fluss/flink/procedure/ProcedureManager.java    |   5 +-
 .../procedure/ResetClusterConfigsProcedure.java    | 106 +++++++++++++
 .../flink/procedure/SetClusterConfigProcedure.java | 128 ---------------
 .../procedure/SetClusterConfigsProcedure.java      | 114 ++++++++++++++
 .../flink/procedure/FlinkProcedureITCase.java      | 173 ++++++++++++++++++---
 website/docs/engine-flink/procedures.md            |  85 ++++++----
 7 files changed, 454 insertions(+), 196 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigProcedure.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigsProcedure.java
similarity index 71%
rename from 
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigProcedure.java
rename to 
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigsProcedure.java
index 444f1dd13..0bdcfaff8 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigProcedure.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigsProcedure.java
@@ -30,6 +30,9 @@ import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * Procedure to get cluster configuration(s).
@@ -37,7 +40,7 @@ import java.util.List;
  * <p>This procedure allows querying dynamic cluster configurations. It can 
retrieve:
  *
  * <ul>
- *   <li>A specific configuration key
+ *   <li>multiple configurations
  *   <li>All configurations (when key parameter is null or empty)
  * </ul>
  *
@@ -45,39 +48,43 @@ import java.util.List;
  *
  * <pre>
  * -- Get a specific configuration
- * CALL sys.get_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec');
+ * CALL 
sys.get_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec');
+ *
+ * -- Get multiple configurations
+ * CALL 
sys.get_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 
'datalake.format');
  *
  * -- Get all cluster configurations
- * CALL sys.get_cluster_config();
+ * CALL sys.get_cluster_configs();
  * </pre>
  */
-public class GetClusterConfigProcedure extends ProcedureBase {
+public class GetClusterConfigsProcedure extends ProcedureBase {
 
     @ProcedureHint(
             output =
                     @DataTypeHint(
                             "ROW<config_key STRING, config_value STRING, 
config_source STRING>"))
     public Row[] call(ProcedureContext context) throws Exception {
-        return getConfigs(null);
+        return getConfigs();
     }
 
     @ProcedureHint(
-            argument = {@ArgumentHint(name = "config_key", type = 
@DataTypeHint("STRING"))},
+            argument = {@ArgumentHint(name = "config_keys", type = 
@DataTypeHint("STRING"))},
+            isVarArgs = true,
             output =
                     @DataTypeHint(
                             "ROW<config_key STRING, config_value STRING, 
config_source STRING>"))
-    public Row[] call(ProcedureContext context, String configKey) throws 
Exception {
-        return getConfigs(configKey);
+    public Row[] call(ProcedureContext context, String... configKeys) throws 
Exception {
+        return getConfigs(configKeys);
     }
 
-    private Row[] getConfigs(@Nullable String configKey) throws Exception {
+    private Row[] getConfigs(@Nullable String... configKeys) throws Exception {
         try {
             // Get all cluster configurations
             Collection<ConfigEntry> configs = 
admin.describeClusterConfigs().get();
 
             List<Row> results = new ArrayList<>();
 
-            if (configKey == null || configKey.isEmpty()) {
+            if (configKeys == null || configKeys.length == 0) {
                 // Return all configurations
                 for (ConfigEntry entry : configs) {
                     results.add(
@@ -87,9 +94,14 @@ public class GetClusterConfigProcedure extends ProcedureBase 
{
                                     entry.source() != null ? 
entry.source().name() : "UNKNOWN"));
                 }
             } else {
-                // Find specific configuration
-                for (ConfigEntry entry : configs) {
-                    if (entry.key().equals(configKey)) {
+                // Find configurations
+                // The order of the results is the same as that of the key.
+                Map<String, ConfigEntry> configEntryMap =
+                        configs.stream()
+                                .collect(Collectors.toMap(ConfigEntry::key, 
Function.identity()));
+                for (String key : configKeys) {
+                    ConfigEntry entry = configEntryMap.get(key);
+                    if (null != entry) {
                         results.add(
                                 Row.of(
                                         entry.key(),
@@ -97,7 +109,6 @@ public class GetClusterConfigProcedure extends ProcedureBase 
{
                                         entry.source() != null
                                                 ? entry.source().name()
                                                 : "UNKNOWN"));
-                        break;
                     }
                 }
             }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java
index 6c7fdb2ec..b97b3cd8e 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java
@@ -70,8 +70,9 @@ public class ProcedureManager {
         ADD_ACL("sys.add_acl", AddAclProcedure.class),
         DROP_ACL("sys.drop_acl", DropAclProcedure.class),
         List_ACL("sys.list_acl", ListAclProcedure.class),
-        SET_CLUSTER_CONFIG("sys.set_cluster_config", 
SetClusterConfigProcedure.class),
-        GET_CLUSTER_CONFIG("sys.get_cluster_config", 
GetClusterConfigProcedure.class),
+        SET_CLUSTER_CONFIGS("sys.set_cluster_configs", 
SetClusterConfigsProcedure.class),
+        GET_CLUSTER_CONFIGS("sys.get_cluster_configs", 
GetClusterConfigsProcedure.class),
+        RESET_CLUSTER_CONFIGS("sys.reset_cluster_configs", 
ResetClusterConfigsProcedure.class),
         ADD_SERVER_TAG("sys.add_server_tag", AddServerTagProcedure.class),
         REMOVE_SERVER_TAG("sys.remove_server_tag", 
RemoveServerTagProcedure.class),
         REBALANCE("sys.rebalance", RebalanceProcedure.class),
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java
new file mode 100644
index 000000000..88e0c297b
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.procedure;
+
+import org.apache.fluss.config.cluster.AlterConfig;
+import org.apache.fluss.config.cluster.AlterConfigOpType;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Procedure to reset cluster configurations to their default values.
+ *
+ * <p>This procedure reverts the configurations to their initial system 
defaults. The changes are:
+ *
+ * <ul>
+ *   <li>Validated by the CoordinatorServer before persistence
+ *   <li>Persisted in ZooKeeper for durability
+ *   <li>Applied to all relevant servers (Coordinator and TabletServers)
+ *   <li>Survive server restarts
+ * </ul>
+ *
+ * <p>Usage examples:
+ *
+ * <pre>
+ * -- reset a configuration
+ * CALL 
sys.reset_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec');
+ *
+ * -- reset multiple configurations at one time
+ * CALL 
sys.reset_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 
'datalake.format');
+ *
+ * </pre>
+ *
+ * <p><b>Note:</b> In theory, an operation like <b>Reset to default value</b> 
should always succeed,
+ * as the default value should be a valid one
+ */
+public class ResetClusterConfigsProcedure extends ProcedureBase {
+
+    @ProcedureHint(
+            argument = {@ArgumentHint(name = "config_keys", type = 
@DataTypeHint("STRING"))},
+            isVarArgs = true)
+    public String[] call(ProcedureContext context, String... configKeys) 
throws Exception {
+        try {
+            // Validate config key
+            if (configKeys.length == 0) {
+                throw new IllegalArgumentException(
+                        "config_keys cannot be null or empty. "
+                                + "Please specify valid configuration keys.");
+            }
+
+            List<AlterConfig> configList = new ArrayList<>();
+            List<String> resultMessage = new ArrayList<>();
+
+            for (String key : configKeys) {
+                String configKey = key.trim();
+                if (configKey.isEmpty()) {
+                    throw new IllegalArgumentException(
+                            "Config key cannot be null or empty. "
+                                    + "Please specify a valid configuration 
key.");
+                }
+
+                String operationDesc = "deleted (reset to default)";
+
+                AlterConfig alterConfig =
+                        new AlterConfig(configKey, null, 
AlterConfigOpType.DELETE);
+                configList.add(alterConfig);
+                resultMessage.add(
+                        String.format(
+                                "Successfully %s configuration '%s'. ", 
operationDesc, configKey));
+            }
+
+            // Call Admin API to modify cluster configuration
+            // This will trigger validation on CoordinatorServer before 
persistence
+            admin.alterClusterConfigs(configList).get();
+
+            return resultMessage.toArray(new String[0]);
+        } catch (IllegalArgumentException e) {
+            // Re-throw validation errors with original message
+            throw e;
+        } catch (Exception e) {
+            // Wrap other exceptions with more context
+            throw new RuntimeException(
+                    String.format("Failed to reset cluster config: %s", 
e.getMessage()), e);
+        }
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigProcedure.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigProcedure.java
deleted file mode 100644
index ee4dbcf37..000000000
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigProcedure.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.flink.procedure;
-
-import org.apache.fluss.config.cluster.AlterConfig;
-import org.apache.fluss.config.cluster.AlterConfigOpType;
-
-import org.apache.flink.table.annotation.ArgumentHint;
-import org.apache.flink.table.annotation.DataTypeHint;
-import org.apache.flink.table.annotation.ProcedureHint;
-import org.apache.flink.table.procedure.ProcedureContext;
-
-import javax.annotation.Nullable;
-
-import java.util.Collections;
-
-/**
- * Procedure to set or delete cluster configuration dynamically.
- *
- * <p>This procedure allows modifying dynamic cluster configurations. The 
changes are:
- *
- * <ul>
- *   <li>Validated by the CoordinatorServer before persistence
- *   <li>Persisted in ZooKeeper for durability
- *   <li>Applied to all relevant servers (Coordinator and TabletServers)
- *   <li>Survive server restarts
- * </ul>
- *
- * <p>Usage examples:
- *
- * <pre>
- * -- Set a configuration
- * CALL sys.set_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 
'200MB');
- * CALL sys.set_cluster_config('datalake.format', 'paimon');
- *
- * -- Delete a configuration (reset to default)
- * CALL sys.set_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 
NULL);
- * CALL sys.set_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 
'');
- * </pre>
- *
- * <p><b>Note:</b> Not all configurations support dynamic changes. The server 
will validate the
- * change and reject it if the configuration cannot be modified dynamically or 
if the new value is
- * invalid.
- */
-public class SetClusterConfigProcedure extends ProcedureBase {
-
-    @ProcedureHint(argument = {@ArgumentHint(name = "config_key", type = 
@DataTypeHint("STRING"))})
-    public String[] call(ProcedureContext context, String configKey) throws 
Exception {
-        return performSet(configKey, null);
-    }
-
-    @ProcedureHint(
-            argument = {
-                @ArgumentHint(name = "config_key", type = 
@DataTypeHint("STRING")),
-                @ArgumentHint(name = "config_value", type = 
@DataTypeHint("STRING"))
-            })
-    public String[] call(ProcedureContext context, String configKey, String 
configValue)
-            throws Exception {
-        return performSet(configKey, configValue);
-    }
-
-    private String[] performSet(String configKey, @Nullable String 
configValue) throws Exception {
-
-        try {
-            // Validate config key
-            if (configKey == null || configKey.trim().isEmpty()) {
-                throw new IllegalArgumentException(
-                        "Config key cannot be null or empty. "
-                                + "Please specify a valid configuration key.");
-            }
-
-            configKey = configKey.trim();
-
-            // Determine operation type
-            AlterConfigOpType opType;
-            String operationDesc;
-
-            if (configValue == null || configValue.trim().isEmpty()) {
-                // Delete operation - reset to default
-                opType = AlterConfigOpType.DELETE;
-                operationDesc = "deleted (reset to default)";
-            } else {
-                // Set operation
-                opType = AlterConfigOpType.SET;
-                operationDesc = String.format("set to '%s'", configValue);
-            }
-
-            // Construct configuration modification operation.
-            AlterConfig alterConfig = new AlterConfig(configKey, configValue, 
opType);
-
-            // Call Admin API to modify cluster configuration
-            // This will trigger validation on CoordinatorServer before 
persistence
-            
admin.alterClusterConfigs(Collections.singletonList(alterConfig)).get();
-
-            return new String[] {
-                String.format(
-                        "Successfully %s configuration '%s'. "
-                                + "The change is persisted in ZooKeeper and 
applied to all servers.",
-                        operationDesc, configKey)
-            };
-
-        } catch (IllegalArgumentException e) {
-            // Re-throw validation errors with original message
-            throw e;
-        } catch (Exception e) {
-            // Wrap other exceptions with more context
-            throw new RuntimeException(
-                    String.format(
-                            "Failed to set cluster config '%s': %s", 
configKey, e.getMessage()),
-                    e);
-        }
-    }
-}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigsProcedure.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigsProcedure.java
new file mode 100644
index 000000000..7246706af
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigsProcedure.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.procedure;
+
+import org.apache.fluss.config.cluster.AlterConfig;
+import org.apache.fluss.config.cluster.AlterConfigOpType;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Procedure to set or delete cluster configuration dynamically.
+ *
+ * <p>This procedure allows modifying dynamic cluster configurations. The 
changes are:
+ *
+ * <ul>
+ *   <li>Validated by the CoordinatorServer before persistence
+ *   <li>Persisted in ZooKeeper for durability
+ *   <li>Applied to all relevant servers (Coordinator and TabletServers)
+ *   <li>Survive server restarts
+ * </ul>
+ *
+ * <p>Usage examples:
+ *
+ * <pre>
+ * -- Set a configuration
+ * CALL 
sys.set_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 
'200MB');
+ * CALL sys.set_cluster_configs('datalake.format', 'paimon');
+ *
+ * -- Set multiple configurations at one time
+ * CALL 
sys.set_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 
'200MB','datalake.format', 'paimon');
+ * </pre>
+ *
+ * <p><b>Note:</b> Not all configurations support dynamic changes. The server 
will validate the
+ * change and reject it if the configuration cannot be modified dynamically or 
if the new value is
+ * invalid.
+ */
+public class SetClusterConfigsProcedure extends ProcedureBase {
+
+    @ProcedureHint(
+            argument = {@ArgumentHint(name = "config_pairs", type = 
@DataTypeHint("STRING"))},
+            isVarArgs = true)
+    public String[] call(ProcedureContext context, String... configPairs) 
throws Exception {
+        try {
+            // Validate config key
+            if (configPairs.length == 0) {
+                throw new IllegalArgumentException(
+                        "config_pairs cannot be null or empty. "
+                                + "Please specify a valid configuration 
pairs.");
+            }
+
+            if (configPairs.length % 2 != 0) {
+                throw new IllegalArgumentException(
+                        "config_pairs must be set in pairs. "
+                                + "Please specify a valid configuration 
pairs.");
+            }
+            List<AlterConfig> configList = new ArrayList<>();
+            List<String> resultMessage = new ArrayList<>();
+
+            for (int i = 0; i < configPairs.length; i += 2) {
+                String configKey = configPairs[i].trim();
+                if (configKey.isEmpty()) {
+                    throw new IllegalArgumentException(
+                            "Config key cannot be null or empty. "
+                                    + "Please specify a valid configuration 
key.");
+                }
+                String configValue = configPairs[i + 1];
+
+                String operationDesc = String.format("set to '%s'", 
configValue);
+
+                // Construct configuration modification operation.
+                AlterConfig alterConfig =
+                        new AlterConfig(configKey, configValue, 
AlterConfigOpType.SET);
+                configList.add(alterConfig);
+                resultMessage.add(
+                        String.format(
+                                "Successfully %s configuration '%s'. ", 
operationDesc, configKey));
+            }
+
+            // Call Admin API to modify cluster configuration
+            // This will trigger validation on CoordinatorServer before 
persistence
+            admin.alterClusterConfigs(configList).get();
+
+            return resultMessage.toArray(new String[0]);
+        } catch (IllegalArgumentException e) {
+            // Re-throw validation errors with original message
+            throw e;
+        } catch (Exception e) {
+            // Wrap other exceptions with more context
+            throw new RuntimeException(
+                    String.format("Failed to set cluster config: %s", 
e.getMessage()), e);
+        }
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java
index 8b595477d..cc15d5ccd 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java
@@ -131,9 +131,10 @@ public abstract class FlinkProcedureITCase {
                     Arrays.asList(
                             "+I[sys.add_acl]",
                             "+I[sys.drop_acl]",
-                            "+I[sys.get_cluster_config]",
+                            "+I[sys.get_cluster_configs]",
                             "+I[sys.list_acl]",
-                            "+I[sys.set_cluster_config]",
+                            "+I[sys.set_cluster_configs]",
+                            "+I[sys.reset_cluster_configs]",
                             "+I[sys.add_server_tag]",
                             "+I[sys.remove_server_tag]",
                             "+I[sys.rebalance]",
@@ -299,12 +300,12 @@ public abstract class FlinkProcedureITCase {
     }
 
     @Test
-    void testGetClusterConfig() throws Exception {
+    void testGetClusterConfigs() throws Exception {
         // Get specific config
         try (CloseableIterator<Row> resultIterator =
                 tEnv.executeSql(
                                 String.format(
-                                        "Call %s.sys.get_cluster_config('%s')",
+                                        "Call 
%s.sys.get_cluster_configs('%s')",
                                         CATALOG_NAME,
                                         
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
                         .collect()) {
@@ -317,9 +318,34 @@ public abstract class FlinkProcedureITCase {
             assertThat(row.getField(2)).isNotNull(); // config_source
         }
 
+        // Get multiple config
+        try (CloseableIterator<Row> resultIterator =
+                tEnv.executeSql(
+                                String.format(
+                                        "Call %s.sys.get_cluster_configs('%s', 
'%s')",
+                                        CATALOG_NAME,
+                                        
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(),
+                                        ConfigOptions.DATALAKE_FORMAT.key()))
+                        .collect()) {
+            List<Row> results = CollectionUtil.iteratorToList(resultIterator);
+            assertThat(results).hasSize(2);
+            // the first row
+            Row row0 = results.get(0);
+            assertThat(row0.getField(0))
+                    
.isEqualTo(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key());
+            assertThat(row0.getField(1)).isEqualTo("100 mb");
+            assertThat(row0.getField(2)).isNotNull(); // config_source
+
+            // the second row
+            Row row1 = results.get(1);
+            
assertThat(row1.getField(0)).isEqualTo(ConfigOptions.DATALAKE_FORMAT.key());
+            assertThat(row1.getField(1)).isEqualTo("paimon");
+            assertThat(row1.getField(2)).isNotNull(); // config_source
+        }
+
         // Get all configs
         try (CloseableIterator<Row> resultIterator =
-                tEnv.executeSql(String.format("Call 
%s.sys.get_cluster_config()", CATALOG_NAME))
+                tEnv.executeSql(String.format("Call 
%s.sys.get_cluster_configs()", CATALOG_NAME))
                         .collect()) {
             List<Row> results = CollectionUtil.iteratorToList(resultIterator);
             assertThat(results).isNotEmpty();
@@ -329,7 +355,7 @@ public abstract class FlinkProcedureITCase {
         try (CloseableIterator<Row> resultIterator =
                 tEnv.executeSql(
                                 String.format(
-                                        "Call 
%s.sys.get_cluster_config('non.existent.config')",
+                                        "Call 
%s.sys.get_cluster_configs('non.existent.config')",
                                         CATALOG_NAME))
                         .collect()) {
             List<Row> results = CollectionUtil.iteratorToList(resultIterator);
@@ -339,19 +365,19 @@ public abstract class FlinkProcedureITCase {
         // reset cluster configs.
         tEnv.executeSql(
                         String.format(
-                                "Call %s.sys.set_cluster_config('%s')",
+                                "Call %s.sys.reset_cluster_configs('%s')",
                                 CATALOG_NAME,
                                 
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
                 .await();
     }
 
     @Test
-    void testSetClusterConfig() throws Exception {
+    void testSetClusterConfigs() throws Exception {
         // Test setting a valid config
         try (CloseableIterator<Row> resultIterator =
                 tEnv.executeSql(
                                 String.format(
-                                        "Call %s.sys.set_cluster_config('%s', 
'200MB')",
+                                        "Call %s.sys.set_cluster_configs('%s', 
'200MB')",
                                         CATALOG_NAME,
                                         
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
                         .collect()) {
@@ -363,66 +389,169 @@ public abstract class FlinkProcedureITCase {
                     
.contains(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key());
         }
 
+        // Test setting multiple config
+        try (CloseableIterator<Row> resultIterator =
+                tEnv.executeSql(
+                                String.format(
+                                        "Call %s.sys.set_cluster_configs('%s', 
'300MB', '%s', 'paimon')",
+                                        CATALOG_NAME,
+                                        
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(),
+                                        ConfigOptions.DATALAKE_FORMAT.key()))
+                        .collect()) {
+            List<Row> results = CollectionUtil.iteratorToList(resultIterator);
+            assertThat(results).hasSize(2);
+            assertThat(results.get(0).getField(0))
+                    .asString()
+                    .contains("Successfully set to '300MB'")
+                    
.contains(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key());
+
+            assertThat(results.get(1).getField(0))
+                    .asString()
+                    .contains("Successfully set to 'paimon'")
+                    .contains(ConfigOptions.DATALAKE_FORMAT.key());
+        }
+
         // Verify the config was actually set
         try (CloseableIterator<Row> resultIterator =
                 tEnv.executeSql(
                                 String.format(
-                                        "Call %s.sys.get_cluster_config('%s')",
+                                        "Call 
%s.sys.get_cluster_configs('%s')",
                                         CATALOG_NAME,
                                         
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
                         .collect()) {
             List<Row> results = CollectionUtil.iteratorToList(resultIterator);
             assertThat(results).hasSize(1);
-            assertThat(results.get(0).getField(1)).isEqualTo("200MB");
+            assertThat(results.get(0).getField(1)).isEqualTo("300MB");
         }
 
         // reset cluster configs.
         tEnv.executeSql(
                         String.format(
-                                "Call %s.sys.set_cluster_config('%s')",
+                                "Call %s.sys.reset_cluster_configs('%s', 
'%s')",
                                 CATALOG_NAME,
-                                
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
+                                
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(),
+                                ConfigOptions.DATALAKE_FORMAT.key()))
                 .await();
     }
 
     @Test
-    void testDeleteClusterConfig() throws Exception {
+    void testResetClusterConfigs() throws Exception {
         // First set a config
         tEnv.executeSql(
                         String.format(
-                                "Call %s.sys.set_cluster_config('%s', 
'paimon')",
-                                CATALOG_NAME, 
ConfigOptions.DATALAKE_FORMAT.key()))
+                                "Call %s.sys.set_cluster_configs('%s', 
'paimon', '%s', '200MB')",
+                                CATALOG_NAME,
+                                ConfigOptions.DATALAKE_FORMAT.key(),
+                                
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
                 .await();
 
-        // Delete the config (reset to default) - omitting the value parameter
+        // Delete the config (reset to default)
         try (CloseableIterator<Row> resultIterator =
                 tEnv.executeSql(
                                 String.format(
-                                        "Call %s.sys.set_cluster_config('%s')",
-                                        CATALOG_NAME, 
ConfigOptions.DATALAKE_FORMAT.key()))
+                                        "Call 
%s.sys.reset_cluster_configs('%s', '%s')",
+                                        CATALOG_NAME,
+                                        ConfigOptions.DATALAKE_FORMAT.key(),
+                                        
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
                         .collect()) {
             List<Row> results = CollectionUtil.iteratorToList(resultIterator);
-            assertThat(results).hasSize(1);
+            assertThat(results).hasSize(2);
             assertThat(results.get(0).getField(0))
                     .asString()
                     .contains("Successfully deleted")
                     .contains(ConfigOptions.DATALAKE_FORMAT.key());
+
+            assertThat(results.get(1).getField(0))
+                    .asString()
+                    .contains("Successfully deleted")
+                    
.contains(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key());
         }
     }
 
     @Test
-    void testSetClusterConfigValidation() throws Exception {
+    void testSetClusterConfigsValidation() throws Exception {
         // Try to set an invalid config (not allowed for dynamic change)
         assertThatThrownBy(
                         () ->
                                 tEnv.executeSql(
                                                 String.format(
-                                                        "Call 
%s.sys.set_cluster_config('invalid.config.key', 'value')",
+                                                        "Call 
%s.sys.set_cluster_configs('invalid.config.key', 'value')",
                                                         CATALOG_NAME))
                                         .await())
                 .rootCause()
+                // TODO: Fix misleading error: non-existent key reported as 
not allowed.
                 .hasMessageContaining(
                         "The config key invalid.config.key is not allowed to 
be changed dynamically");
+
+        // validation to ensure an even number of arguments are passed
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                                String.format(
+                                                        "Call 
%s.sys.set_cluster_configs('%s')",
+                                                        CATALOG_NAME,
+                                                        ConfigOptions
+                                                                
.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC
+                                                                .key()))
+                                        .await())
+                .rootCause()
+                .hasMessageContaining(
+                        "config_pairs must be set in pairs. Please specify a 
valid configuration pairs");
+
+        // Try to no parameters passed
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                                String.format(
+                                                        "Call 
%s.sys.set_cluster_configs()",
+                                                        CATALOG_NAME))
+                                        .await())
+                .rootCause()
+                .hasMessageContaining(
+                        "config_pairs cannot be null or empty. Please specify 
a valid configuration pairs");
+
+        // Try to mismatched key-value pairs in the input parameters.
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                                String.format(
+                                                        "Call 
%s.sys.set_cluster_configs('%s', 'paimon')",
+                                                        CATALOG_NAME,
+                                                        ConfigOptions
+                                                                
.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC
+                                                                .key()))
+                                        .await())
+                .rootCause()
+                .hasMessageContaining(
+                        "Could not parse value 'paimon' for key 
'kv.rocksdb.shared-rate-limiter.bytes-per-sec'");
+    }
+
+    @Test
+    void testResetClusterConfigsValidation() throws Exception {
+        // Try to reset an invalid config
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                                String.format(
+                                                        "Call 
%s.sys.reset_cluster_configs('invalid.config.key')",
+                                                        CATALOG_NAME))
+                                        .await())
+                .rootCause()
+                // TODO: Fix misleading error: non-existent key reported as 
not allowed.
+                .hasMessageContaining(
+                        "The config key invalid.config.key is not allowed to 
be changed dynamically");
+
+        // Try to no parameters passed
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                                String.format(
+                                                        "Call 
%s.sys.reset_cluster_configs()",
+                                                        CATALOG_NAME))
+                                        .await())
+                .rootCause()
+                .hasMessageContaining(
+                        "config_keys cannot be null or empty. Please specify 
valid configuration keys");
     }
 
     @ParameterizedTest
diff --git a/website/docs/engine-flink/procedures.md 
b/website/docs/engine-flink/procedures.md
index 95e8724eb..eeab4496f 100644
--- a/website/docs/engine-flink/procedures.md
+++ b/website/docs/engine-flink/procedures.md
@@ -162,23 +162,23 @@ CALL sys.list_acl(
 
 Fluss provides procedures to dynamically manage cluster configurations without 
requiring a server restart.
 
-### get_cluster_config
+### get_cluster_configs
 
 Retrieve cluster configuration values.
 
 **Syntax:**
 
 ```sql
--- Get a specific configuration
-CALL [catalog_name.]sys.get_cluster_config(config_key => 'STRING')
+-- Get multiple configurations
+CALL [catalog_name.]sys.get_cluster_configs(config_keys => 'key1' [, 'key2', 
...])
 
 -- Get all cluster configurations
-CALL [catalog_name.]sys.get_cluster_config()
+CALL [catalog_name.]sys.get_cluster_configs()
 ```
 
 **Parameters:**
 
-- `config_key` (optional): The configuration key to retrieve. If omitted, 
returns all cluster configurations.
+- `config_keys` (optional): The configuration keys to retrieve. If omitted, 
returns all cluster configurations.
 
 **Returns:** A table with columns:
 - `config_key`: The configuration key name
@@ -192,35 +192,35 @@ CALL [catalog_name.]sys.get_cluster_config()
 USE fluss_catalog;
 
 -- Get a specific configuration
-CALL sys.get_cluster_config(
-  config_key => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec'
+CALL sys.get_cluster_configs(
+  config_keys => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec'
+);
+
+-- Get multiple configuration
+CALL sys.get_cluster_configs(
+  config_keys => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec', 
'datalake.format'
 );
 
 -- Get all cluster configurations
-CALL sys.get_cluster_config();
+CALL sys.get_cluster_configs();
 ```
 
-### set_cluster_config
+### set_cluster_configs
 
-Set or delete a cluster configuration dynamically.
+Set cluster configurations dynamically.
 
 **Syntax:**
 
 ```sql
--- Set a configuration value
-CALL [catalog_name.]sys.set_cluster_config(
-  config_key => 'STRING',
-  config_value => 'STRING'
+-- Set configuration values
+CALL [catalog_name.]sys.set_cluster_configs(
+  config_pairs => 'key1', 'value1' [, 'key2', 'value2' ...]
 )
-
--- Delete a configuration (reset to default)
-CALL [catalog_name.]sys.set_cluster_config(config_key => 'STRING')
 ```
 
 **Parameters:**
 
-- `config_key` (required): The configuration key to modify.
-- `config_value` (optional): The new value to set. If omitted or empty, the 
configuration is deleted (reset to default).
+- `config_pairs`(required): For key-value pairs in configuration items, the 
number of parameters must be even.
 
 **Important Notes:**
 
@@ -236,20 +236,45 @@ CALL [catalog_name.]sys.set_cluster_config(config_key => 
'STRING')
 USE fluss_catalog;
 
 -- Set RocksDB rate limiter
-CALL sys.set_cluster_config(
-  config_key => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec',
-  config_value => '200MB'
+CALL sys.set_cluster_configs(
+  config_pairs => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB'
 );
 
--- Set datalake format
-CALL sys.set_cluster_config(
-  config_key => 'datalake.format',
-  config_value => 'paimon'
+-- Set RocksDB rate limiter and datalake format
+CALL sys.set_cluster_configs(
+  config_pairs => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB', 
'datalake.format','paimon'
 );
+```
 
--- Delete a configuration (reset to default)
-CALL sys.set_cluster_config(
-  config_key => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec'
-);
+### reset_cluster_configs
+
+reset cluster configurations dynamically.
+
+**Syntax:**
+
+```sql
+-- reset configuration values
+CALL [catalog_name.]sys.reset_cluster_configs(config_keys => 'key1' [, 'key2', 
...])
 ```
 
+**Parameters:**
+
+- `config_keys`(required): The configuration keys to reset.
+
+
+**Example:**
+
+```sql title="Flink SQL"
+-- Use the Fluss catalog (replace 'fluss_catalog' with your catalog name if 
different)
+USE fluss_catalog;
+
+-- Reset a specific configuration
+CALL sys.reset_cluster_configs(
+  config_keys => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec'
+);
+
+-- Reset RocksDB rate limiter and datalake format
+CALL sys.reset_cluster_configs(
+  config_keys => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec', 
'datalake.format'
+);
+```
\ No newline at end of file


Reply via email to