Copilot commented on code in PR #2279: URL: https://github.com/apache/fluss/pull/2279#discussion_r2659720711
########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.procedure; + +import org.apache.fluss.config.cluster.AlterConfig; +import org.apache.fluss.config.cluster.AlterConfigOpType; + +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; +import org.apache.flink.table.procedure.ProcedureContext; + +import java.util.ArrayList; +import java.util.List; + +/** + * Procedure to reset cluster configuration dynamically. + * + * <p>This procedure allows modifying dynamic cluster configurations. The changes are: + * + * <ul> + * <li>Validated by the CoordinatorServer before persistence + * <li>Persisted in ZooKeeper for durability + * <li>Applied to all relevant servers (Coordinator and TabletServers) + * <li>Survive server restarts + * </ul> + * + * <p>Usage examples: + * + * <pre> + * -- reset a configuration + * CALL sys.reset_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec'); + * + * -- reset multiple configurations at one time + * CALL sys.reset_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec','datalake.format'); Review Comment: Missing space after the comma in the example. Should be: 'CALL sys.reset_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 'datalake.format');' ```suggestion * CALL sys.reset_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 'datalake.format'); ``` ########## fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java: ########## @@ -383,12 +408,27 @@ void testSetClusterConfigValidation() throws Exception { () -> tEnv.executeSql( String.format( - "Call %s.sys.set_cluster_config('invalid.config.key', 'value')", + "Call %s.sys.set_cluster_configs('invalid.config.key', 'value')", CATALOG_NAME)) .await()) .rootCause() .hasMessageContaining( "The config key invalid.config.key is not allowed to be changed dynamically"); + + // validation to ensure an even number of arguments are passed + assertThatThrownBy( + () -> + tEnv.executeSql( + String.format( + "Call %s.sys.set_cluster_configs('%s')", + CATALOG_NAME, + ConfigOptions + .KV_SHARED_RATE_LIMITER_BYTES_PER_SEC + .key())) + .await()) + .rootCause() + .hasMessageContaining( + "config_pairs must be set in pairs. Please specify a valid configuration pairs."); Review Comment: The expected error message uses "configuration pairs." with a period before the end quote, but this should be "configuration pairs" without the period for proper grammar. The period should only appear after the closing quote in a sentence. ```suggestion "config_pairs must be set in pairs. Please specify valid \"configuration pairs\"."); ``` ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.procedure; + +import org.apache.fluss.config.cluster.AlterConfig; +import org.apache.fluss.config.cluster.AlterConfigOpType; + +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; +import org.apache.flink.table.procedure.ProcedureContext; + +import java.util.ArrayList; +import java.util.List; + +/** + * Procedure to reset cluster configuration dynamically. + * + * <p>This procedure allows modifying dynamic cluster configurations. The changes are: + * + * <ul> + * <li>Validated by the CoordinatorServer before persistence + * <li>Persisted in ZooKeeper for durability + * <li>Applied to all relevant servers (Coordinator and TabletServers) + * <li>Survive server restarts + * </ul> + * + * <p>Usage examples: + * + * <pre> + * -- reset a configuration + * CALL sys.reset_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec'); + * + * -- reset multiple configurations at one time + * CALL sys.reset_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec','datalake.format'); + * + * </pre> + * + * <p><b>Note:</b> Not all configurations support dynamic changes. The server will validate the + * change and reject it if the configuration cannot be reset dynamically. + */ +public class ResetClusterConfigsProcedure extends ProcedureBase { + + @ProcedureHint( + argument = {@ArgumentHint(name = "config_keys", type = @DataTypeHint("STRING"))}, + isVarArgs = true) + public String[] call(ProcedureContext context, String... configKeys) throws Exception { + try { + // Validate config key + if (configKeys.length == 0) { + throw new IllegalArgumentException( + "config_pairs cannot be null or empty. " + + "Please specify valid configuration keys."); + } + + List<AlterConfig> configList = new ArrayList<>(); + StringBuilder resultMessage = new StringBuilder(); + + for (String key : configKeys) { + String configKey = key.trim(); + if (configKey.isEmpty()) { + throw new IllegalArgumentException( + "Config key cannot be null or empty. " + + "Please specify valid configuration key."); Review Comment: The error message should say "a valid configuration key" (singular) instead of "valid configuration key" to be grammatically correct and consistent with similar error messages in the codebase. ```suggestion + "Please specify a valid configuration key."); ``` ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.procedure; + +import org.apache.fluss.config.cluster.AlterConfig; +import org.apache.fluss.config.cluster.AlterConfigOpType; + +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; +import org.apache.flink.table.procedure.ProcedureContext; + +import java.util.ArrayList; +import java.util.List; + +/** + * Procedure to reset cluster configuration dynamically. + * + * <p>This procedure allows modifying dynamic cluster configurations. The changes are: + * + * <ul> + * <li>Validated by the CoordinatorServer before persistence + * <li>Persisted in ZooKeeper for durability + * <li>Applied to all relevant servers (Coordinator and TabletServers) + * <li>Survive server restarts + * </ul> + * + * <p>Usage examples: + * + * <pre> + * -- reset a configuration + * CALL sys.reset_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec'); + * + * -- reset multiple configurations at one time + * CALL sys.reset_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec','datalake.format'); + * + * </pre> + * + * <p><b>Note:</b> Not all configurations support dynamic changes. The server will validate the + * change and reject it if the configuration cannot be reset dynamically. + */ +public class ResetClusterConfigsProcedure extends ProcedureBase { + + @ProcedureHint( + argument = {@ArgumentHint(name = "config_keys", type = @DataTypeHint("STRING"))}, + isVarArgs = true) + public String[] call(ProcedureContext context, String... configKeys) throws Exception { + try { + // Validate config key + if (configKeys.length == 0) { + throw new IllegalArgumentException( + "config_pairs cannot be null or empty. " Review Comment: The error message mentions "config_pairs" but should mention "config_keys" to match the parameter name used in this procedure. The parameter is named "config_keys" in the @ArgumentHint annotation. ```suggestion "config_keys cannot be null or empty. " ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
