platinumhamburg commented on code in PR #2279:
URL: https://github.com/apache/fluss/pull/2279#discussion_r2660528601


##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java:
##########
@@ -265,12 +266,12 @@ void testDisableAuthorization() throws Exception {
     }
 
     @Test
-    void testGetClusterConfig() throws Exception {
+    void testGetClusterConfigs() throws Exception {

Review Comment:
   Need to enrich this case by covering scenarios, including: 
   - retrieval of multiple configurations 
   - some configurations do not exist.
   
   
   



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigsProcedure.java:
##########
@@ -45,84 +44,76 @@
  *
  * <pre>
  * -- Set a configuration
- * CALL sys.set_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 
'200MB');
- * CALL sys.set_cluster_config('datalake.format', 'paimon');
+ * CALL 
sys.set_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 
'200MB');
+ * CALL sys.set_cluster_configs('datalake.format', 'paimon');
+ *
+ * -- Set multiple configurations at one time
+ * CALL 
sys.set_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 
'200MB','datalake.format', 'paimon');
  *
  * -- Delete a configuration (reset to default)
- * CALL sys.set_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 
NULL);
- * CALL sys.set_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 
'');
+ * CALL 
sys.set_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '');
  * </pre>
  *
  * <p><b>Note:</b> Not all configurations support dynamic changes. The server 
will validate the
  * change and reject it if the configuration cannot be modified dynamically or 
if the new value is
  * invalid.
  */
-public class SetClusterConfigProcedure extends ProcedureBase {
-
-    @ProcedureHint(argument = {@ArgumentHint(name = "config_key", type = 
@DataTypeHint("STRING"))})
-    public String[] call(ProcedureContext context, String configKey) throws 
Exception {
-        return performSet(configKey, null);
-    }
+public class SetClusterConfigsProcedure extends ProcedureBase {
 
     @ProcedureHint(
-            argument = {
-                @ArgumentHint(name = "config_key", type = 
@DataTypeHint("STRING")),
-                @ArgumentHint(name = "config_value", type = 
@DataTypeHint("STRING"))
-            })
-    public String[] call(ProcedureContext context, String configKey, String 
configValue)
-            throws Exception {
-        return performSet(configKey, configValue);
-    }
-
-    private String[] performSet(String configKey, @Nullable String 
configValue) throws Exception {
-
+            argument = {@ArgumentHint(name = "config_pairs", type = 
@DataTypeHint("STRING"))},
+            isVarArgs = true)
+    public String[] call(ProcedureContext context, String... configPairs) 
throws Exception {
         try {
             // Validate config key
-            if (configKey == null || configKey.trim().isEmpty()) {
+            if (configPairs.length == 0) {
                 throw new IllegalArgumentException(
-                        "Config key cannot be null or empty. "
-                                + "Please specify a valid configuration key.");
+                        "config_pairs cannot be null or empty. "
+                                + "Please specify a valid configuration 
pairs.");
             }
 
-            configKey = configKey.trim();
-
-            // Determine operation type
-            AlterConfigOpType opType;
-            String operationDesc;
-
-            if (configValue == null || configValue.trim().isEmpty()) {
-                // Delete operation - reset to default
-                opType = AlterConfigOpType.DELETE;
-                operationDesc = "deleted (reset to default)";
-            } else {
-                // Set operation
-                opType = AlterConfigOpType.SET;
-                operationDesc = String.format("set to '%s'", configValue);
+            if (configPairs.length % 2 != 0) {
+                throw new IllegalArgumentException(
+                        "config_pairs must be set in pairs. "
+                                + "Please specify a valid configuration 
pairs.");
+            }
+            List<AlterConfig> configList = new ArrayList<>();
+            StringBuilder resultMessage = new StringBuilder();
+
+            for (int i = 0; i < configPairs.length; i += 2) {
+                String configKey = configPairs[i].trim();
+                if (configKey.isEmpty()) {
+                    throw new IllegalArgumentException(
+                            "Config key cannot be null or empty. "
+                                    + "Please specify a valid configuration 
key.");
+                }
+                String configValue = configPairs[i + 1];
+
+                String operationDesc = String.format("set to '%s'", 
configValue);
+
+                // Construct configuration modification operation.
+                AlterConfig alterConfig =
+                        new AlterConfig(configKey, configValue, 
AlterConfigOpType.SET);
+                configList.add(alterConfig);
+                resultMessage.append(
+                        String.format(
+                                "Successfully %s configuration '%s'. ", 
operationDesc, configKey));
             }
-
-            // Construct configuration modification operation.
-            AlterConfig alterConfig = new AlterConfig(configKey, configValue, 
opType);
 
             // Call Admin API to modify cluster configuration
             // This will trigger validation on CoordinatorServer before 
persistence
-            
admin.alterClusterConfigs(Collections.singletonList(alterConfig)).get();
+            admin.alterClusterConfigs(configList).get();
 
             return new String[] {
-                String.format(
-                        "Successfully %s configuration '%s'. "
-                                + "The change is persisted in ZooKeeper and 
applied to all servers.",
-                        operationDesc, configKey)
+                resultMessage + "The change is persisted in ZooKeeper and 
applied to all servers."

Review Comment:
   The current resultMessage aggregates the processing results of all requests 
into a single line, resulting in poor readability. Since the return value 
supports returning a String[], please refactor the code to improve readability.
   
   



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.procedure;
+
+import org.apache.fluss.config.cluster.AlterConfig;
+import org.apache.fluss.config.cluster.AlterConfigOpType;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Procedure to reset cluster configuration dynamically.
+ *
+ * <p>This procedure allows modifying dynamic cluster configurations. The 
changes are:
+ *
+ * <ul>
+ *   <li>Validated by the CoordinatorServer before persistence
+ *   <li>Persisted in ZooKeeper for durability
+ *   <li>Applied to all relevant servers (Coordinator and TabletServers)
+ *   <li>Survive server restarts
+ * </ul>
+ *
+ * <p>Usage examples:
+ *
+ * <pre>
+ * -- reset a configuration
+ * CALL 
sys.reset_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec');
+ *
+ * -- reset multiple configurations at one time
+ * CALL 
sys.reset_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 
'datalake.format');
+ *
+ * </pre>
+ *
+ * <p><b>Note:</b> Not all configurations support dynamic changes. The server 
will validate the
+ * change and reject it if the configuration cannot be reset dynamically.
+ */
+public class ResetClusterConfigsProcedure extends ProcedureBase {
+
+    @ProcedureHint(
+            argument = {@ArgumentHint(name = "config_keys", type = 
@DataTypeHint("STRING"))},
+            isVarArgs = true)
+    public String[] call(ProcedureContext context, String... configKeys) 
throws Exception {
+        try {
+            // Validate config key
+            if (configKeys.length == 0) {
+                throw new IllegalArgumentException(
+                        "config_keys cannot be null or empty. "
+                                + "Please specify valid configuration keys.");
+            }
+
+            List<AlterConfig> configList = new ArrayList<>();
+            StringBuilder resultMessage = new StringBuilder();
+
+            for (String key : configKeys) {
+                String configKey = key.trim();
+                if (configKey.isEmpty()) {
+                    throw new IllegalArgumentException(
+                            "Config key cannot be null or empty. "
+                                    + "Please specify a valid configuration 
key.");
+                }
+
+                String operationDesc = "deleted (reset to default)";
+
+                AlterConfig alterConfig =
+                        new AlterConfig(configKey, null, 
AlterConfigOpType.DELETE);
+                configList.add(alterConfig);
+                resultMessage.append(
+                        String.format(
+                                "Successfully %s configuration '%s'. ", 
operationDesc, configKey));
+            }
+
+            // Call Admin API to modify cluster configuration
+            // This will trigger validation on CoordinatorServer before 
persistence
+            admin.alterClusterConfigs(configList).get();
+
+            return new String[] {
+                resultMessage + "The change is persisted in ZooKeeper and 
applied to all servers."

Review Comment:
   Ditto, optimize the returned message—do not aggregate responses from all 
requests into a single line.
   
   



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.procedure;
+
+import org.apache.fluss.config.cluster.AlterConfig;
+import org.apache.fluss.config.cluster.AlterConfigOpType;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Procedure to reset cluster configuration dynamically.
+ *
+ * <p>This procedure allows modifying dynamic cluster configurations. The 
changes are:
+ *
+ * <ul>
+ *   <li>Validated by the CoordinatorServer before persistence
+ *   <li>Persisted in ZooKeeper for durability
+ *   <li>Applied to all relevant servers (Coordinator and TabletServers)
+ *   <li>Survive server restarts
+ * </ul>
+ *
+ * <p>Usage examples:
+ *
+ * <pre>
+ * -- reset a configuration
+ * CALL 
sys.reset_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec');
+ *
+ * -- reset multiple configurations at one time
+ * CALL 
sys.reset_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 
'datalake.format');
+ *
+ * </pre>
+ *
+ * <p><b>Note:</b> Not all configurations support dynamic changes. The server 
will validate the

Review Comment:
   ditto
   



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.procedure;
+
+import org.apache.fluss.config.cluster.AlterConfig;
+import org.apache.fluss.config.cluster.AlterConfigOpType;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Procedure to reset cluster configuration dynamically.
+ *
+ * <p>This procedure allows modifying dynamic cluster configurations. The 
changes are:
+ *
+ * <ul>
+ *   <li>Validated by the CoordinatorServer before persistence

Review Comment:
   The comment description should be updated. In theory, an operation like 
"Reset to default value" should always succeed, as the default value should be 
a valid one.
   
   



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigsProcedure.java:
##########
@@ -45,84 +44,76 @@
  *
  * <pre>
  * -- Set a configuration
- * CALL sys.set_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 
'200MB');
- * CALL sys.set_cluster_config('datalake.format', 'paimon');
+ * CALL 
sys.set_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 
'200MB');
+ * CALL sys.set_cluster_configs('datalake.format', 'paimon');
+ *
+ * -- Set multiple configurations at one time
+ * CALL 
sys.set_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 
'200MB','datalake.format', 'paimon');
  *
  * -- Delete a configuration (reset to default)
- * CALL sys.set_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 
NULL);
- * CALL sys.set_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 
'');
+ * CALL 
sys.set_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '');

Review Comment:
   Remove, not supported.



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java:
##########


Review Comment:
   Need to enrich the test scenarios covered in this test case, including:
   - No parameters passed;
   - Mismatched key-value pairs in the input parameters.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to