This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 9e6f6a4de [kv] Support shared RocksDB rate limiter (#2178)
9e6f6a4de is described below

commit 9e6f6a4de000becf3bc8ac40e8a443939c828c50
Author: Yang Wang <[email protected]>
AuthorDate: Wed Dec 24 01:15:47 2025 +0800

    [kv] Support shared RocksDB rate limiter (#2178)
---
 .../org/apache/fluss/config/ConfigOptions.java     |  54 +++
 .../fluss/config/cluster/ConfigValidator.java      |  78 +++++
 .../flink/procedure/GetClusterConfigProcedure.java | 111 +++++++
 .../fluss/flink/procedure/ProcedureManager.java    |   4 +-
 .../flink/procedure/SetClusterConfigProcedure.java | 128 +++++++
 .../flink/procedure/FlinkProcedureITCase.java      | 137 +++++++-
 .../apache/fluss/server/DynamicConfigManager.java  |  14 +
 .../apache/fluss/server/DynamicServerConfig.java   | 369 ++++++++++++++++++---
 .../server/coordinator/CoordinatorServer.java      |   3 +
 .../java/org/apache/fluss/server/kv/KvManager.java | 135 +++++++-
 .../java/org/apache/fluss/server/kv/KvTablet.java  |  44 +--
 .../kv/rocksdb/RocksDBResourceContainer.java       |  24 +-
 .../apache/fluss/server/tablet/TabletServer.java   |   6 +-
 .../fluss/server/DynamicConfigChangeTest.java      |  93 +++++-
 .../org/apache/fluss/server/kv/KvTabletTest.java   |   3 +-
 website/docs/engine-flink/datastream.mdx           |   2 +-
 website/docs/engine-flink/delta-joins.md           |   2 +-
 website/docs/engine-flink/getting-started.md       |   1 +
 website/docs/engine-flink/lookups.md               |   2 +-
 website/docs/engine-flink/options.md               |   2 +-
 website/docs/engine-flink/procedures.md            | 255 ++++++++++++++
 website/docs/engine-flink/reads.md                 |   2 +-
 website/docs/engine-flink/writes.md                |   2 +-
 website/docs/maintenance/configuration.md          |   1 +
 .../maintenance/operations/updating-configs.md     |  14 +-
 25 files changed, 1380 insertions(+), 106 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java 
b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
index 43c244554..c1059ac06 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
@@ -28,10 +28,12 @@ import org.apache.fluss.metadata.LogFormat;
 import org.apache.fluss.metadata.MergeEngineType;
 import org.apache.fluss.utils.ArrayUtils;
 
+import java.lang.reflect.Field;
 import java.time.Duration;
 import java.time.ZoneId;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -1576,6 +1578,17 @@ public class ConfigOptions {
                             "The max size of the consumed memory for RocksDB 
batch write, "
                                     + "will flush just based on item count if 
this config set to 0.");
 
+    public static final ConfigOption<MemorySize> 
KV_SHARED_RATE_LIMITER_BYTES_PER_SEC =
+            key("kv.rocksdb.shared-rate-limiter.bytes-per-sec")
+                    .memoryType()
+                    .defaultValue(new MemorySize(Long.MAX_VALUE))
+                    .withDescription(
+                            "The shared rate limit in bytes per second for 
RocksDB flush and compaction operations "
+                                    + "across all RocksDB instances in the 
TabletServer. "
+                                    + "All KV tablets share a single global 
RateLimiter to prevent disk IO from being saturated. "
+                                    + "The RateLimiter is always enabled. The 
default value is Long.MAX_VALUE (effectively unlimited). "
+                                    + "Set to a lower value (e.g., 100MB) to 
limit the rate.");
+
     // 
--------------------------------------------------------------------------
     // Provided configurable ColumnFamilyOptions within Fluss
     // 
--------------------------------------------------------------------------
@@ -1869,4 +1882,45 @@ public class ConfigOptions {
         LZ4,
         ZSTD
     }
+
+    // ------------------------------------------------------------------------
+    //  ConfigOptions Registry and Utilities
+    // ------------------------------------------------------------------------
+
+    /**
+     * Holder class for lazy initialization of ConfigOptions registry. This 
ensures that the
+     * registry is initialized only when first accessed, and guarantees that 
all ConfigOption fields
+     * are already initialized (since static initialization happens in 
declaration order).
+     */
+    private static class ConfigOptionsHolder {
+        private static final Map<String, ConfigOption<?>> 
CONFIG_OPTIONS_BY_KEY;
+
+        static {
+            Map<String, ConfigOption<?>> options = new HashMap<>();
+            Field[] fields = ConfigOptions.class.getFields();
+            for (Field field : fields) {
+                if (!ConfigOption.class.isAssignableFrom(field.getType())) {
+                    continue;
+                }
+                try {
+                    ConfigOption<?> configOption = (ConfigOption<?>) 
field.get(null);
+                    options.put(configOption.key(), configOption);
+                } catch (IllegalAccessException e) {
+                    // Ignore fields that cannot be accessed
+                }
+            }
+            CONFIG_OPTIONS_BY_KEY = Collections.unmodifiableMap(options);
+        }
+    }
+
+    /**
+     * Gets the ConfigOption for a given key.
+     *
+     * @param key the configuration key
+     * @return the ConfigOption if found, null otherwise
+     */
+    @Internal
+    public static ConfigOption<?> getConfigOption(String key) {
+        return ConfigOptionsHolder.CONFIG_OPTIONS_BY_KEY.get(key);
+    }
 }
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/config/cluster/ConfigValidator.java
 
b/fluss-common/src/main/java/org/apache/fluss/config/cluster/ConfigValidator.java
new file mode 100644
index 000000000..deb931323
--- /dev/null
+++ 
b/fluss-common/src/main/java/org/apache/fluss/config/cluster/ConfigValidator.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.config.cluster;
+
+import org.apache.fluss.annotation.PublicEvolving;
+import org.apache.fluss.exception.ConfigException;
+
+import javax.annotation.Nullable;
+
+/**
+ * Validator for a single dynamic configuration key.
+ *
+ * <p>Unlike {@link ServerReconfigurable}, validators are stateless and only 
perform validation
+ * logic without requiring component instances. This allows coordinators to 
validate configurations
+ * for components they don't run (e.g., KvManager).
+ *
+ * <p>Example use case: CoordinatorServer needs to validate KV-related 
configurations even though it
+ * doesn't have a KvManager instance. A {@link ConfigValidator} can be 
registered on both
+ * CoordinatorServer (for validation) and TabletServer (for both validation 
and actual
+ * reconfiguration via {@link ServerReconfigurable}).
+ *
+ * <p>Each validator monitors a single configuration key. The validator will 
only be invoked when
+ * that specific key changes, improving validation efficiency.
+ *
+ * <p>This interface is designed to be stateless and thread-safe. 
Implementations should not rely on
+ * any mutable component state.
+ *
+ * <p><b>Type-safe validation:</b> The validator receives strongly-typed 
values that have already
+ * been parsed and validated for basic type correctness. This avoids redundant 
string parsing and
+ * allows validators to focus on business logic validation.
+ *
+ * @param <T> the type of the configuration value being validated
+ */
+@PublicEvolving
+public interface ConfigValidator<T> {
+
+    /**
+     * Returns the configuration key this validator monitors.
+     *
+     * <p>The validator will only be invoked when this specific configuration 
key changes. This
+     * allows efficient filtering of validators and clear declaration of 
dependencies.
+     *
+     * @return the configuration key to monitor, must not be null or empty
+     */
+    String configKey();
+
+    /**
+     * Validates a configuration value change.
+     *
+     * <p>This method is called when the monitored configuration key changes. 
It should check
+     * whether the new value is valid, potentially considering the old value 
and validation rules.
+     *
+     * <p>The method should be stateless and deterministic - given the same 
old and new values, it
+     * should always produce the same validation result.
+     *
+     * @param oldValue the previous value of the configuration key, null if 
the key was not set
+     *     before
+     * @param newValue the new value of the configuration key, null if the key 
is being deleted
+     * @throws ConfigException if the configuration change is invalid, with a 
descriptive error
+     *     message explaining why the change cannot be applied
+     */
+    void validate(@Nullable T oldValue, @Nullable T newValue) throws 
ConfigException;
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigProcedure.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigProcedure.java
new file mode 100644
index 000000000..444f1dd13
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigProcedure.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.procedure;
+
+import org.apache.fluss.config.cluster.ConfigEntry;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.types.Row;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Procedure to get cluster configuration(s).
+ *
+ * <p>This procedure allows querying dynamic cluster configurations. It can 
retrieve:
+ *
+ * <ul>
+ *   <li>A specific configuration key
+ *   <li>All configurations (when key parameter is null or empty)
+ * </ul>
+ *
+ * <p>Usage examples:
+ *
+ * <pre>
+ * -- Get a specific configuration
+ * CALL sys.get_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec');
+ *
+ * -- Get all cluster configurations
+ * CALL sys.get_cluster_config();
+ * </pre>
+ */
+public class GetClusterConfigProcedure extends ProcedureBase {
+
+    @ProcedureHint(
+            output =
+                    @DataTypeHint(
+                            "ROW<config_key STRING, config_value STRING, 
config_source STRING>"))
+    public Row[] call(ProcedureContext context) throws Exception {
+        return getConfigs(null);
+    }
+
+    @ProcedureHint(
+            argument = {@ArgumentHint(name = "config_key", type = 
@DataTypeHint("STRING"))},
+            output =
+                    @DataTypeHint(
+                            "ROW<config_key STRING, config_value STRING, 
config_source STRING>"))
+    public Row[] call(ProcedureContext context, String configKey) throws 
Exception {
+        return getConfigs(configKey);
+    }
+
+    private Row[] getConfigs(@Nullable String configKey) throws Exception {
+        try {
+            // Get all cluster configurations
+            Collection<ConfigEntry> configs = 
admin.describeClusterConfigs().get();
+
+            List<Row> results = new ArrayList<>();
+
+            if (configKey == null || configKey.isEmpty()) {
+                // Return all configurations
+                for (ConfigEntry entry : configs) {
+                    results.add(
+                            Row.of(
+                                    entry.key(),
+                                    entry.value(),
+                                    entry.source() != null ? 
entry.source().name() : "UNKNOWN"));
+                }
+            } else {
+                // Find specific configuration
+                for (ConfigEntry entry : configs) {
+                    if (entry.key().equals(configKey)) {
+                        results.add(
+                                Row.of(
+                                        entry.key(),
+                                        entry.value(),
+                                        entry.source() != null
+                                                ? entry.source().name()
+                                                : "UNKNOWN"));
+                        break;
+                    }
+                }
+            }
+
+            return results.toArray(new Row[0]);
+
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to get cluster config: " + 
e.getMessage(), e);
+        }
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java
index 0767b3f23..dcc5e223e 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java
@@ -69,7 +69,9 @@ public class ProcedureManager {
     private enum ProcedureEnum {
         ADD_ACL("sys.add_acl", AddAclProcedure.class),
         DROP_ACL("sys.drop_acl", DropAclProcedure.class),
-        List_ACL("sys.list_acl", ListAclProcedure.class);
+        List_ACL("sys.list_acl", ListAclProcedure.class),
+        SET_CLUSTER_CONFIG("sys.set_cluster_config", 
SetClusterConfigProcedure.class),
+        GET_CLUSTER_CONFIG("sys.get_cluster_config", 
GetClusterConfigProcedure.class);
 
         private final String path;
         private final Class<? extends ProcedureBase> procedureClass;
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigProcedure.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigProcedure.java
new file mode 100644
index 000000000..ee4dbcf37
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigProcedure.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.procedure;
+
+import org.apache.fluss.config.cluster.AlterConfig;
+import org.apache.fluss.config.cluster.AlterConfigOpType;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+
+/**
+ * Procedure to set or delete cluster configuration dynamically.
+ *
+ * <p>This procedure allows modifying dynamic cluster configurations. The 
changes are:
+ *
+ * <ul>
+ *   <li>Validated by the CoordinatorServer before persistence
+ *   <li>Persisted in ZooKeeper for durability
+ *   <li>Applied to all relevant servers (Coordinator and TabletServers)
+ *   <li>Survive server restarts
+ * </ul>
+ *
+ * <p>Usage examples:
+ *
+ * <pre>
+ * -- Set a configuration
+ * CALL sys.set_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 
'200MB');
+ * CALL sys.set_cluster_config('datalake.format', 'paimon');
+ *
+ * -- Delete a configuration (reset to default)
+ * CALL sys.set_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 
NULL);
+ * CALL sys.set_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 
'');
+ * </pre>
+ *
+ * <p><b>Note:</b> Not all configurations support dynamic changes. The server 
will validate the
+ * change and reject it if the configuration cannot be modified dynamically or 
if the new value is
+ * invalid.
+ */
+public class SetClusterConfigProcedure extends ProcedureBase {
+
+    @ProcedureHint(argument = {@ArgumentHint(name = "config_key", type = 
@DataTypeHint("STRING"))})
+    public String[] call(ProcedureContext context, String configKey) throws 
Exception {
+        return performSet(configKey, null);
+    }
+
+    @ProcedureHint(
+            argument = {
+                @ArgumentHint(name = "config_key", type = 
@DataTypeHint("STRING")),
+                @ArgumentHint(name = "config_value", type = 
@DataTypeHint("STRING"))
+            })
+    public String[] call(ProcedureContext context, String configKey, String 
configValue)
+            throws Exception {
+        return performSet(configKey, configValue);
+    }
+
+    private String[] performSet(String configKey, @Nullable String 
configValue) throws Exception {
+
+        try {
+            // Validate config key
+            if (configKey == null || configKey.trim().isEmpty()) {
+                throw new IllegalArgumentException(
+                        "Config key cannot be null or empty. "
+                                + "Please specify a valid configuration key.");
+            }
+
+            configKey = configKey.trim();
+
+            // Determine operation type
+            AlterConfigOpType opType;
+            String operationDesc;
+
+            if (configValue == null || configValue.trim().isEmpty()) {
+                // Delete operation - reset to default
+                opType = AlterConfigOpType.DELETE;
+                operationDesc = "deleted (reset to default)";
+            } else {
+                // Set operation
+                opType = AlterConfigOpType.SET;
+                operationDesc = String.format("set to '%s'", configValue);
+            }
+
+            // Construct configuration modification operation.
+            AlterConfig alterConfig = new AlterConfig(configKey, configValue, 
opType);
+
+            // Call Admin API to modify cluster configuration
+            // This will trigger validation on CoordinatorServer before 
persistence
+            
admin.alterClusterConfigs(Collections.singletonList(alterConfig)).get();
+
+            return new String[] {
+                String.format(
+                        "Successfully %s configuration '%s'. "
+                                + "The change is persisted in ZooKeeper and 
applied to all servers.",
+                        operationDesc, configKey)
+            };
+
+        } catch (IllegalArgumentException e) {
+            // Re-throw validation errors with original message
+            throw e;
+        } catch (Exception e) {
+            // Wrap other exceptions with more context
+            throw new RuntimeException(
+                    String.format(
+                            "Failed to set cluster config '%s': %s", 
configKey, e.getMessage()),
+                    e);
+        }
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java
index 92956432d..61ed2c78d 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java
@@ -91,7 +91,12 @@ public abstract class FlinkProcedureITCase {
         try (CloseableIterator<Row> showProceduresIterator =
                 tEnv.executeSql("show procedures").collect()) {
             List<String> expectedShowProceduresResult =
-                    Arrays.asList("+I[sys.add_acl]", "+I[sys.drop_acl]", 
"+I[sys.list_acl]");
+                    Arrays.asList(
+                            "+I[sys.add_acl]",
+                            "+I[sys.drop_acl]",
+                            "+I[sys.get_cluster_config]",
+                            "+I[sys.list_acl]",
+                            "+I[sys.set_cluster_config]");
             // make sure no more results is unread.
             assertResultsIgnoreOrder(showProceduresIterator, 
expectedShowProceduresResult, true);
         }
@@ -259,6 +264,133 @@ public abstract class FlinkProcedureITCase {
         }
     }
 
+    @Test
+    void testGetClusterConfig() throws Exception {
+        // Get specific config
+        try (CloseableIterator<Row> resultIterator =
+                tEnv.executeSql(
+                                String.format(
+                                        "Call %s.sys.get_cluster_config('%s')",
+                                        CATALOG_NAME,
+                                        
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
+                        .collect()) {
+            List<Row> results = CollectionUtil.iteratorToList(resultIterator);
+            assertThat(results).hasSize(1);
+            Row row = results.get(0);
+            assertThat(row.getField(0))
+                    
.isEqualTo(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key());
+            assertThat(row.getField(1)).isEqualTo("100 mb");
+            assertThat(row.getField(2)).isNotNull(); // config_source
+        }
+
+        // Get all configs
+        try (CloseableIterator<Row> resultIterator =
+                tEnv.executeSql(String.format("Call 
%s.sys.get_cluster_config()", CATALOG_NAME))
+                        .collect()) {
+            List<Row> results = CollectionUtil.iteratorToList(resultIterator);
+            assertThat(results).isNotEmpty();
+        }
+
+        // Get non-existent config
+        try (CloseableIterator<Row> resultIterator =
+                tEnv.executeSql(
+                                String.format(
+                                        "Call 
%s.sys.get_cluster_config('non.existent.config')",
+                                        CATALOG_NAME))
+                        .collect()) {
+            List<Row> results = CollectionUtil.iteratorToList(resultIterator);
+            assertThat(results).hasSize(0);
+        }
+
+        // reset cluster configs.
+        tEnv.executeSql(
+                        String.format(
+                                "Call %s.sys.set_cluster_config('%s')",
+                                CATALOG_NAME,
+                                
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
+                .await();
+    }
+
+    @Test
+    void testSetClusterConfig() throws Exception {
+        // Test setting a valid config
+        try (CloseableIterator<Row> resultIterator =
+                tEnv.executeSql(
+                                String.format(
+                                        "Call %s.sys.set_cluster_config('%s', 
'200MB')",
+                                        CATALOG_NAME,
+                                        
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
+                        .collect()) {
+            List<Row> results = CollectionUtil.iteratorToList(resultIterator);
+            assertThat(results).hasSize(1);
+            assertThat(results.get(0).getField(0))
+                    .asString()
+                    .contains("Successfully set to '200MB'")
+                    
.contains(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key());
+        }
+
+        // Verify the config was actually set
+        try (CloseableIterator<Row> resultIterator =
+                tEnv.executeSql(
+                                String.format(
+                                        "Call %s.sys.get_cluster_config('%s')",
+                                        CATALOG_NAME,
+                                        
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
+                        .collect()) {
+            List<Row> results = CollectionUtil.iteratorToList(resultIterator);
+            assertThat(results).hasSize(1);
+            assertThat(results.get(0).getField(1)).isEqualTo("200MB");
+        }
+
+        // reset cluster configs.
+        tEnv.executeSql(
+                        String.format(
+                                "Call %s.sys.set_cluster_config('%s')",
+                                CATALOG_NAME,
+                                
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
+                .await();
+    }
+
+    @Test
+    void testDeleteClusterConfig() throws Exception {
+        // First set a config
+        tEnv.executeSql(
+                        String.format(
+                                "Call %s.sys.set_cluster_config('%s', 
'paimon')",
+                                CATALOG_NAME, 
ConfigOptions.DATALAKE_FORMAT.key()))
+                .await();
+
+        // Delete the config (reset to default) - omitting the value parameter
+        try (CloseableIterator<Row> resultIterator =
+                tEnv.executeSql(
+                                String.format(
+                                        "Call %s.sys.set_cluster_config('%s')",
+                                        CATALOG_NAME, 
ConfigOptions.DATALAKE_FORMAT.key()))
+                        .collect()) {
+            List<Row> results = CollectionUtil.iteratorToList(resultIterator);
+            assertThat(results).hasSize(1);
+            assertThat(results.get(0).getField(0))
+                    .asString()
+                    .contains("Successfully deleted")
+                    .contains(ConfigOptions.DATALAKE_FORMAT.key());
+        }
+    }
+
+    @Test
+    void testSetClusterConfigValidation() throws Exception {
+        // Try to set an invalid config (not allowed for dynamic change)
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                                String.format(
+                                                        "Call 
%s.sys.set_cluster_config('invalid.config.key', 'value')",
+                                                        CATALOG_NAME))
+                                        .await())
+                .rootCause()
+                .hasMessageContaining(
+                        "The config key invalid.config.key is not allowed to 
be changed dynamically");
+    }
+
     private static Configuration initConfig() {
         Configuration conf = new Configuration();
         conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);
@@ -272,6 +404,9 @@ public abstract class FlinkProcedureITCase {
         conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE, 
MemorySize.parse("1mb"));
         conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, 
MemorySize.parse("1kb"));
 
+        // Enable shared RocksDB rate limiter for testing
+        conf.set(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC, 
MemorySize.parse("100mb"));
+
         // set security information.
         conf.setString(ConfigOptions.SERVER_SECURITY_PROTOCOL_MAP.key(), 
"CLIENT:sasl");
         conf.setString("security.sasl.enabled.mechanisms", "plain");
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/DynamicConfigManager.java 
b/fluss-server/src/main/java/org/apache/fluss/server/DynamicConfigManager.java
index 1580a1c18..a6b678c2e 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/DynamicConfigManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/DynamicConfigManager.java
@@ -21,6 +21,7 @@ import org.apache.fluss.annotation.VisibleForTesting;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.config.cluster.AlterConfig;
 import org.apache.fluss.config.cluster.ConfigEntry;
+import org.apache.fluss.config.cluster.ConfigValidator;
 import org.apache.fluss.config.cluster.ServerReconfigurable;
 import org.apache.fluss.exception.ConfigException;
 import org.apache.fluss.server.authorizer.ZkNodeChangeNotificationWatcher;
@@ -76,6 +77,19 @@ public class DynamicConfigManager {
         dynamicServerConfig.register(serverReconfigurable);
     }
 
+    /**
+     * Register a ConfigValidator for stateless validation.
+     *
+     * <p>Typically used by CoordinatorServer to validate configs for 
components it doesn't run
+     * (e.g., KvManager). Validators are stateless and only perform validation 
without requiring
+     * component instances.
+     *
+     * @param validator the config validator to register
+     */
+    public void registerValidator(ConfigValidator validator) {
+        dynamicServerConfig.registerValidator(validator);
+    }
+
     public void close() {
         configChangeListener.stop();
     }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java 
b/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java
index 7e767a148..bb19aa331 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java
@@ -18,7 +18,10 @@
 package org.apache.fluss.server;
 
 import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.config.ConfigOption;
+import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
+import org.apache.fluss.config.cluster.ConfigValidator;
 import org.apache.fluss.config.cluster.ServerReconfigurable;
 import org.apache.fluss.exception.ConfigException;
 import org.apache.fluss.utils.MapUtils;
@@ -26,15 +29,20 @@ import org.apache.fluss.utils.MapUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
+import static 
org.apache.fluss.config.ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC;
 import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
 import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
 
@@ -49,13 +57,19 @@ class DynamicServerConfig {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DynamicServerConfig.class);
     private static final Set<String> ALLOWED_CONFIG_KEYS =
-            Collections.singleton(DATALAKE_FORMAT.key());
+            new HashSet<>(
+                    Arrays.asList(
+                            DATALAKE_FORMAT.key(), 
KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()));
     private static final Set<String> ALLOWED_CONFIG_PREFIXES = 
Collections.singleton("datalake.");
 
     private final ReadWriteLock lock = new ReentrantReadWriteLock();
     private final Map<Class<? extends ServerReconfigurable>, 
ServerReconfigurable>
             serverReconfigures = MapUtils.newConcurrentHashMap();
 
+    /** Registered stateless config validators, organized by config key for 
efficient lookup. */
+    private final Map<String, List<ConfigValidator<?>>> configValidatorsByKey =
+            MapUtils.newConcurrentHashMap();
+
     /** The initial configuration items when the server starts from 
server.yaml. */
     private final Map<String, String> initialConfigMap;
 
@@ -83,6 +97,25 @@ class DynamicServerConfig {
         serverReconfigures.put(serverReconfigurable.getClass(), 
serverReconfigurable);
     }
 
+    /**
+     * Register a ConfigValidator for stateless validation.
+     *
+     * <p>This is typically used by CoordinatorServer to validate configs for 
components it doesn't
+     * run (e.g., KvManager). Validators are stateless and only perform 
validation without requiring
+     * component instances.
+     *
+     * <p>Validators are organized by config key for efficient lookup. 
Multiple validators can be
+     * registered for the same config key.
+     *
+     * @param validator the config validator to register
+     */
+    void registerValidator(ConfigValidator<?> validator) {
+        String configKey = validator.configKey();
+        configValidatorsByKey
+                .computeIfAbsent(configKey, k -> new CopyOnWriteArrayList<>())
+                .add(validator);
+    }
+
     /**
      * Update the dynamic configuration and apply to registered 
ServerReconfigurable. If skipping
      * error config, only the error one will be ignored.
@@ -115,68 +148,300 @@ class DynamicServerConfig {
 
     private void updateCurrentConfig(Map<String, String> newDynamicConfigs, 
boolean skipErrorConfig)
             throws Exception {
-        Map<String, String> newProps = new HashMap<>(initialConfigMap);
-        overrideProps(newProps, newDynamicConfigs);
-        Configuration newConfig = Configuration.fromMap(newProps);
-        Configuration oldConfig = currentConfig;
-        Set<ServerReconfigurable> appliedServerReconfigurableSet = new 
HashSet<>();
-        if (!newProps.equals(currentConfigMap)) {
-            serverReconfigures
-                    .values()
-                    .forEach(
-                            serverReconfigurable -> {
-                                try {
-                                    serverReconfigurable.validate(newConfig);
-                                } catch (ConfigException e) {
-                                    LOG.error(
-                                            "Validate new dynamic config error 
and will roll back all the applied config.",
-                                            e);
-                                    if (!skipErrorConfig) {
-                                        throw e;
-                                    }
-                                }
-                            });
-
-            Exception throwable = null;
-            for (ServerReconfigurable serverReconfigurable : 
serverReconfigures.values()) {
-                try {
-                    serverReconfigurable.reconfigure(newConfig);
-                    appliedServerReconfigurableSet.add(serverReconfigurable);
-                } catch (ConfigException e) {
-                    LOG.error(
-                            "Apply new dynamic error and will roll back all 
the applied config.",
-                            e);
-                    if (!skipErrorConfig) {
-                        throwable = e;
-                        break;
-                    }
-                }
+        // Compute effective config changes (merge with initial configs)
+        Map<String, String> effectiveChanges =
+                computeEffectiveChanges(newDynamicConfigs, skipErrorConfig);
+
+        // Early return if no effective changes
+        if (effectiveChanges.isEmpty()) {
+            LOG.info("No effective config changes detected for: {}", 
newDynamicConfigs);
+            return;
+        }
+
+        // Build new configuration by merging initial + dynamic configs
+        Map<String, String> newConfigMap = buildConfigMap(effectiveChanges);
+        Configuration newConfig = Configuration.fromMap(newConfigMap);
+
+        // Apply changes to all registered ServerReconfigurable instances
+        applyToServerReconfigurables(newConfig, skipErrorConfig);
+
+        // Update internal state
+        updateInternalState(newConfig, newConfigMap, newDynamicConfigs);
+        LOG.info("Dynamic configs changed: {}", effectiveChanges);
+    }
+
+    /**
+     * Computes effective config changes by validating new configs and 
handling deletions.
+     *
+     * @param newDynamicConfigs new dynamic configs from ZooKeeper
+     * @param skipErrorConfig whether to skip invalid configs
+     * @return map of config changes that passed validation
+     * @throws ConfigException if validation fails and skipErrorConfig is false
+     */
+    private Map<String, String> computeEffectiveChanges(
+            Map<String, String> newDynamicConfigs, boolean skipErrorConfig) 
throws ConfigException {
+        Map<String, String> effectiveChanges = new HashMap<>();
+        Set<String> skippedConfigs = new HashSet<>();
+
+        // Process deleted configs: restore to initial value or remove
+        processDeletions(newDynamicConfigs, effectiveChanges, skippedConfigs, 
skipErrorConfig);
+
+        // Process added/modified configs
+        processModifications(newDynamicConfigs, effectiveChanges, 
skippedConfigs, skipErrorConfig);
+
+        if (!skippedConfigs.isEmpty()) {
+            LOG.warn("Skipped invalid configs: {}", skippedConfigs);
+        }
+
+        return effectiveChanges;
+    }
+
+    /** Processes config deletions by restoring to initial values or removing 
them. */
+    private void processDeletions(
+            Map<String, String> newDynamicConfigs,
+            Map<String, String> effectiveChanges,
+            Set<String> skippedConfigs,
+            boolean skipErrorConfig)
+            throws ConfigException {
+        for (String configKey : dynamicConfigs.keySet()) {
+            if (newDynamicConfigs.containsKey(configKey)) {
+                continue; // Not deleted
+            }
+
+            String currentValue = currentConfigMap.get(configKey);
+            String initialValue = initialConfigMap.get(configKey);
+
+            // Determine target value: initial value or null (removal)
+            String targetValue = initialValue;
+
+            // Skip if no change needed (already at initial value)
+            if (Objects.equals(currentValue, targetValue)) {
+                continue;
+            }
+
+            // Validate the change
+            if (validateConfigChange(
+                    configKey, currentValue, targetValue, skippedConfigs, 
skipErrorConfig)) {
+                effectiveChanges.put(configKey, targetValue);
             }
+        }
+    }
+
+    /** Processes config additions and modifications. */
+    private void processModifications(
+            Map<String, String> newDynamicConfigs,
+            Map<String, String> effectiveChanges,
+            Set<String> skippedConfigs,
+            boolean skipErrorConfig)
+            throws ConfigException {
+        for (Map.Entry<String, String> entry : newDynamicConfigs.entrySet()) {
+            String configKey = entry.getKey();
+            String newValue = entry.getValue();
+            String currentValue = currentConfigMap.get(configKey);
 
-            // rollback to old config if there is an error.
-            if (throwable != null) {
-                appliedServerReconfigurableSet.forEach(
-                        serverReconfigurable -> 
serverReconfigurable.reconfigure(oldConfig));
-                throw throwable;
+            // Skip if value unchanged
+            if (Objects.equals(currentValue, newValue)) {
+                continue;
             }
 
-            currentConfig = newConfig;
-            currentConfigMap.clear();
-            dynamicConfigs.clear();
-            currentConfigMap.putAll(newProps);
-            dynamicConfigs.putAll(newDynamicConfigs);
-            LOG.info("Dynamic configs changed: {}", newDynamicConfigs);
+            // Validate and add to effective changes
+            if (validateConfigChange(
+                    configKey, currentValue, newValue, skippedConfigs, 
skipErrorConfig)) {
+                effectiveChanges.put(configKey, newValue);
+            }
         }
     }
 
-    private void overrideProps(Map<String, String> props, Map<String, String> 
propsOverride) {
-        propsOverride.forEach(
+    /**
+     * Validates a single config change.
+     *
+     * @return true if validation passed, false if skipped due to error
+     * @throws ConfigException if validation fails and skipErrorConfig is false
+     */
+    private boolean validateConfigChange(
+            String configKey,
+            String oldValue,
+            String newValue,
+            Set<String> skippedConfigs,
+            boolean skipErrorConfig)
+            throws ConfigException {
+        try {
+            validateSingleConfig(configKey, oldValue, newValue);
+            return true;
+        } catch (ConfigException e) {
+            LOG.error(
+                    "Config validation failed for '{}': {} -> {}. {}",
+                    configKey,
+                    oldValue,
+                    newValue,
+                    e.getMessage());
+            if (skipErrorConfig) {
+                skippedConfigs.add(configKey);
+                return false;
+            } else {
+                throw e;
+            }
+        }
+    }
+
+    /** Builds final config map by merging initial configs with effective 
changes. */
+    private Map<String, String> buildConfigMap(Map<String, String> 
effectiveChanges) {
+        Map<String, String> configMap = new HashMap<>(initialConfigMap);
+        effectiveChanges.forEach(
                 (key, value) -> {
                     if (value == null) {
-                        props.remove(key);
+                        configMap.remove(key);
                     } else {
-                        props.put(key, value);
+                        configMap.put(key, value);
                     }
                 });
+        return configMap;
+    }
+
+    /** Updates internal state after successful reconfiguration. */
+    private void updateInternalState(
+            Configuration newConfig,
+            Map<String, String> newConfigMap,
+            Map<String, String> newDynamicConfigs) {
+        currentConfig = newConfig;
+        currentConfigMap.clear();
+        currentConfigMap.putAll(newConfigMap);
+        dynamicConfigs.clear();
+        dynamicConfigs.putAll(newDynamicConfigs);
+    }
+
+    /**
+     * Validates a single config entry including type parsing and business 
validation.
+     *
+     * @param configKey config key
+     * @param oldValueStr old value string
+     * @param newValueStr new value string
+     * @throws ConfigException if validation fails
+     */
+    private void validateSingleConfig(String configKey, String oldValueStr, 
String newValueStr)
+            throws ConfigException {
+        // Get ConfigOption for type information
+        ConfigOption<?> configOption = 
ConfigOptions.getConfigOption(configKey);
+
+        // For configs with allowed prefixes (like "datalake."), skip 
ConfigOption validation
+        // and rely on ServerReconfigurable's business validation
+        boolean hasPrefixConfig = false;
+        for (String prefix : ALLOWED_CONFIG_PREFIXES) {
+            if (configKey.startsWith(prefix)) {
+                hasPrefixConfig = true;
+                break;
+            }
+        }
+
+        if (configOption == null && !hasPrefixConfig) {
+            throw new ConfigException(
+                    String.format("No ConfigOption found for config key: %s", 
configKey));
+        }
+
+        // Parse and validate type only if ConfigOption exists
+        Object newValue = null;
+        if (configOption != null && newValueStr != null) {
+            Configuration tempConfig = new Configuration();
+            tempConfig.setString(configKey, newValueStr);
+            try {
+                newValue = tempConfig.getOptional(configOption).get();
+            } catch (Exception e) {
+                String causeMessage =
+                        e.getMessage() != null ? e.getMessage() : 
e.getClass().getSimpleName();
+                throw new ConfigException(
+                        String.format(
+                                "Cannot parse '%s' as %s for config '%s': %s",
+                                newValueStr,
+                                configOption.isList()
+                                        ? "List<" + 
configOption.getClazz().getSimpleName() + ">"
+                                        : 
configOption.getClazz().getSimpleName(),
+                                configKey,
+                                causeMessage),
+                        e);
+            }
+        }
+
+        // Business validation with registered validators (if any)
+        List<ConfigValidator<?>> validators = 
configValidatorsByKey.get(configKey);
+        if (validators != null && !validators.isEmpty() && configOption != 
null) {
+            Object oldValue =
+                    oldValueStr != null
+                            ? 
currentConfig.getOptional(configOption).orElse(null)
+                            : null;
+            for (ConfigValidator<?> validator : validators) {
+                invokeValidator(validator, oldValue, newValue);
+            }
+        }
+    }
+
+    /**
+     * Applies new configuration to all ServerReconfigurable instances with 
rollback support.
+     *
+     * @param newConfig new configuration to apply
+     * @param skipErrorConfig whether to skip errors
+     * @throws Exception if apply fails and skipErrorConfig is false
+     */
+    private void applyToServerReconfigurables(Configuration newConfig, boolean 
skipErrorConfig)
+            throws Exception {
+        Configuration oldConfig = currentConfig;
+        Set<ServerReconfigurable> appliedSet = new HashSet<>();
+
+        // Validate all first
+        for (ServerReconfigurable reconfigurable : 
serverReconfigures.values()) {
+            try {
+                reconfigurable.validate(newConfig);
+            } catch (ConfigException e) {
+                LOG.error(
+                        "Validation failed for {}: {}",
+                        reconfigurable.getClass().getSimpleName(),
+                        e.getMessage(),
+                        e);
+                if (!skipErrorConfig) {
+                    throw e;
+                }
+            }
+        }
+
+        // Apply to all instances
+        Exception throwable = null;
+        for (ServerReconfigurable reconfigurable : 
serverReconfigures.values()) {
+            try {
+                reconfigurable.reconfigure(newConfig);
+                appliedSet.add(reconfigurable);
+            } catch (ConfigException e) {
+                LOG.error(
+                        "Reconfiguration failed for {}: {}",
+                        reconfigurable.getClass().getSimpleName(),
+                        e.getMessage(),
+                        e);
+                if (!skipErrorConfig) {
+                    throwable = e;
+                    break;
+                }
+            }
+        }
+
+        // Rollback if there was an error
+        if (throwable != null) {
+            appliedSet.forEach(r -> r.reconfigure(oldConfig));
+            throw throwable;
+        }
+    }
+
+    /**
+     * Invokes validator with strongly-typed values.
+     *
+     * @param validator the config validator to invoke
+     * @param oldValue the old typed value
+     * @param newValue the new typed value
+     * @throws ConfigException if validation fails
+     */
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private void invokeValidator(ConfigValidator<?> validator, Object 
oldValue, Object newValue)
+            throws ConfigException {
+        // Invoke validator with typed values
+        // We suppress unchecked warning because we trust that the validator
+        // is registered for the correct type matching the ConfigOption
+        ((ConfigValidator) validator).validate(oldValue, newValue);
     }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
index 83bba2c52..cd546f447 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
@@ -173,7 +173,10 @@ public class CoordinatorServer extends ServerBase {
 
             this.lakeCatalogDynamicLoader = new LakeCatalogDynamicLoader(conf, 
pluginManager, true);
             this.dynamicConfigManager = new DynamicConfigManager(zkClient, 
conf, true);
+
+            // Register server reconfigurable components
             dynamicConfigManager.register(lakeCatalogDynamicLoader);
+
             dynamicConfigManager.startup();
 
             this.coordinatorContext = new CoordinatorContext();
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java
index b5318ef60..435353090 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java
@@ -20,7 +20,10 @@ package org.apache.fluss.server.kv;
 import org.apache.fluss.compression.ArrowCompressionInfo;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
+import org.apache.fluss.config.MemorySize;
 import org.apache.fluss.config.TableConfig;
+import org.apache.fluss.config.cluster.ServerReconfigurable;
+import org.apache.fluss.exception.ConfigException;
 import org.apache.fluss.exception.KvStorageException;
 import org.apache.fluss.fs.FileSystem;
 import org.apache.fluss.fs.FsPath;
@@ -45,6 +48,9 @@ import org.apache.fluss.utils.FlussPaths;
 import org.apache.fluss.utils.MapUtils;
 import org.apache.fluss.utils.types.Tuple2;
 
+import org.rocksdb.RateLimiter;
+import org.rocksdb.RateLimiterMode;
+import org.rocksdb.RocksDB;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,9 +71,42 @@ import static 
org.apache.fluss.utils.concurrent.LockUtils.inLock;
  * the individual instances.
  */
 @ThreadSafe
-public final class KvManager extends TabletManagerBase {
+public final class KvManager extends TabletManagerBase implements 
ServerReconfigurable {
 
     private static final Logger LOG = LoggerFactory.getLogger(KvManager.class);
+
+    /**
+     * Default global rate limiter with unlimited rate (Long.MAX_VALUE bytes 
per second).
+     *
+     * <p>This is used by RocksDBResourceContainer when no rate limiter is 
explicitly provided,
+     * ensuring the API is safer and more robust by avoiding null checks 
throughout the code.
+     */
+    private static final RateLimiter DEFAULT_RATE_LIMITER = 
createDefaultRateLimiter();
+
+    /**
+     * Creates a default rate limiter with unlimited rate (Long.MAX_VALUE 
bytes per second).
+     *
+     * @return a default rate limiter instance
+     */
+    private static RateLimiter createDefaultRateLimiter() {
+        RocksDB.loadLibrary();
+        // Create a rate limiter with unlimited rate (effectively no limit)
+        // Using default refill period and fairness values
+        return new RateLimiter(Long.MAX_VALUE);
+    }
+
+    /**
+     * Returns the default global rate limiter with unlimited rate.
+     *
+     * <p>This method provides access to the default rate limiter for use in
+     * RocksDBResourceContainer when no rate limiter is explicitly provided.
+     *
+     * @return the default rate limiter instance
+     */
+    public static RateLimiter getDefaultRateLimiter() {
+        return DEFAULT_RATE_LIMITER;
+    }
+
     private final LogManager logManager;
 
     private final TabletServerMetricGroup serverMetricGroup;
@@ -89,6 +128,16 @@ public final class KvManager extends TabletManagerBase {
 
     private final FileSystem remoteFileSystem;
 
+    /**
+     * The shared rate limiter for all RocksDB instances to control flush and 
compaction write rate.
+     */
+    private final RateLimiter sharedRocksDBRateLimiter;
+
+    /** Current shared rate limiter configuration in bytes per second. */
+    private volatile long currentSharedRateLimitBytesPerSec;
+
+    private volatile boolean isShutdown = false;
+
     private KvManager(
             File dataDir,
             Configuration conf,
@@ -105,6 +154,27 @@ public final class KvManager extends TabletManagerBase {
         this.remoteKvDir = FlussPaths.remoteKvDir(conf);
         this.remoteFileSystem = remoteKvDir.getFileSystem();
         this.serverMetricGroup = tabletServerMetricGroup;
+        this.sharedRocksDBRateLimiter = createSharedRateLimiter(conf);
+        this.currentSharedRateLimitBytesPerSec =
+                
conf.get(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC).getBytes();
+    }
+
+    private static RateLimiter createSharedRateLimiter(Configuration conf) {
+        long sharedRateLimitBytesPerSecond =
+                
conf.get(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC).getBytes();
+
+        RocksDB.loadLibrary();
+        // Always create a shared rate limiter with the configured rate limit.
+        // The rate limiter is always enabled with a default value of 
Long.MAX_VALUE (effectively
+        // unlimited).
+        // This avoids the overhead of dynamically enabling/disabling the rate 
limiter.
+        // refill_period_us is set to 100ms, fairness is set to 10
+        return new RateLimiter(
+                sharedRateLimitBytesPerSecond,
+                RateLimiter.DEFAULT_REFILL_PERIOD_MICROS,
+                RateLimiter.DEFAULT_FAIRNESS,
+                RateLimiterMode.WRITES_ONLY,
+                false);
     }
 
     public static KvManager create(
@@ -130,6 +200,7 @@ public final class KvManager extends TabletManagerBase {
 
     public void shutdown() {
         LOG.info("Shutting down KvManager");
+        isShutdown = true;
         List<KvTablet> kvs = new ArrayList<>(currentKvs.values());
         for (KvTablet kvTablet : kvs) {
             try {
@@ -140,6 +211,9 @@ public final class KvManager extends TabletManagerBase {
         }
         arrowBufferAllocator.close();
         memorySegmentPool.close();
+        if (sharedRocksDBRateLimiter != null) {
+            sharedRocksDBRateLimiter.close();
+        }
         LOG.info("Shut down KvManager complete.");
     }
 
@@ -176,6 +250,8 @@ public final class KvManager extends TabletManagerBase {
                     RowMerger merger = RowMerger.create(tableConfig, kvFormat);
                     KvTablet tablet =
                             KvTablet.create(
+                                    tablePath,
+                                    tableBucket,
                                     logTablet,
                                     tabletDir,
                                     conf,
@@ -186,7 +262,8 @@ public final class KvManager extends TabletManagerBase {
                                     merger,
                                     arrowCompressionInfo,
                                     schemaGetter,
-                                    tableConfig.getChangelogImage());
+                                    tableConfig.getChangelogImage(),
+                                    sharedRocksDBRateLimiter);
                     currentKvs.put(tableBucket, tablet);
 
                     LOG.info(
@@ -294,7 +371,8 @@ public final class KvManager extends TabletManagerBase {
                         rowMerger,
                         tableConfig.getArrowCompressionInfo(),
                         schemaGetter,
-                        tableConfig.getChangelogImage());
+                        tableConfig.getChangelogImage(),
+                        sharedRocksDBRateLimiter);
         if (this.currentKvs.containsKey(tableBucket)) {
             throw new IllegalStateException(
                     String.format(
@@ -327,4 +405,55 @@ public final class KvManager extends TabletManagerBase {
                     e);
         }
     }
+
+    // ============ ServerReconfigurable Implementation ============
+
+    @Override
+    public void validate(Configuration newConfig) throws ConfigException {
+        // Config validation is already handled by KvConfigValidator which is 
registered
+        // on both CoordinatorServer and TabletServer. Here we only need to 
check runtime state.
+
+        // Check if KvManager is in a valid state to accept reconfiguration
+        if (isShutdown) {
+            throw new ConfigException("Cannot reconfigure KvManager during 
shutdown");
+        }
+
+        // All config value validations are delegated to KvConfigValidator
+        LOG.debug("KvManager runtime state validation passed for 
reconfiguration");
+    }
+
+    @Override
+    public void reconfigure(Configuration newConfig) throws ConfigException {
+        long newSharedRateLimitBytes =
+                
newConfig.get(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC).getBytes();
+
+        // If value hasn't changed, skip
+        if (newSharedRateLimitBytes == currentSharedRateLimitBytesPerSec) {
+            LOG.debug(
+                    "Shared RocksDB rate limiter config unchanged: {} 
bytes/sec",
+                    newSharedRateLimitBytes);
+            return;
+        }
+
+        long oldValue = currentSharedRateLimitBytesPerSec;
+
+        try {
+            // Apply new configuration using RocksDB API (thread-safe)
+            // The rate limiter is always enabled, so we can safely 
reconfigure it
+            
sharedRocksDBRateLimiter.setBytesPerSecond(newSharedRateLimitBytes);
+            currentSharedRateLimitBytesPerSec = newSharedRateLimitBytes;
+
+            LOG.info(
+                    "Shared RocksDB rate limiter reconfigured: {} bytes/sec 
({}) -> {} bytes/sec ({})",
+                    oldValue,
+                    new MemorySize(oldValue).toHumanReadableString(),
+                    newSharedRateLimitBytes,
+                    new 
MemorySize(newSharedRateLimitBytes).toHumanReadableString());
+
+        } catch (Exception e) {
+            // If setting fails, throw ConfigException to trigger rollback
+            throw new ConfigException(
+                    "Failed to reconfigure shared RocksDB rate limiter: " + 
e.getMessage(), e);
+        }
+    }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
index 1f1fb5f0f..0dde0a53d 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
@@ -67,9 +67,8 @@ import 
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
 import org.apache.fluss.types.RowType;
 import org.apache.fluss.utils.BytesUtils;
 import org.apache.fluss.utils.FileUtils;
-import org.apache.fluss.utils.FlussPaths;
-import org.apache.fluss.utils.types.Tuple2;
 
+import org.rocksdb.RateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -161,37 +160,6 @@ public final class KvTablet {
         this.changelogImage = changelogImage;
     }
 
-    public static KvTablet create(
-            LogTablet logTablet,
-            File kvTabletDir,
-            Configuration serverConf,
-            TabletServerMetricGroup serverMetricGroup,
-            BufferAllocator arrowBufferAllocator,
-            MemorySegmentPool memorySegmentPool,
-            KvFormat kvFormat,
-            RowMerger rowMerger,
-            ArrowCompressionInfo arrowCompressionInfo,
-            SchemaGetter schemaGetter,
-            ChangelogImage changelogImage)
-            throws IOException {
-        Tuple2<PhysicalTablePath, TableBucket> tablePathAndBucket =
-                FlussPaths.parseTabletDir(kvTabletDir);
-        return create(
-                tablePathAndBucket.f0,
-                tablePathAndBucket.f1,
-                logTablet,
-                kvTabletDir,
-                serverConf,
-                serverMetricGroup,
-                arrowBufferAllocator,
-                memorySegmentPool,
-                kvFormat,
-                rowMerger,
-                arrowCompressionInfo,
-                schemaGetter,
-                changelogImage);
-    }
-
     public static KvTablet create(
             PhysicalTablePath tablePath,
             TableBucket tableBucket,
@@ -205,9 +173,10 @@ public final class KvTablet {
             RowMerger rowMerger,
             ArrowCompressionInfo arrowCompressionInfo,
             SchemaGetter schemaGetter,
-            ChangelogImage changelogImage)
+            ChangelogImage changelogImage,
+            RateLimiter sharedRateLimiter)
             throws IOException {
-        RocksDBKv kv = buildRocksDBKv(serverConf, kvTabletDir);
+        RocksDBKv kv = buildRocksDBKv(serverConf, kvTabletDir, 
sharedRateLimiter);
         return new KvTablet(
                 tablePath,
                 tableBucket,
@@ -226,10 +195,11 @@ public final class KvTablet {
                 changelogImage);
     }
 
-    private static RocksDBKv buildRocksDBKv(Configuration configuration, File 
kvDir)
+    private static RocksDBKv buildRocksDBKv(
+            Configuration configuration, File kvDir, RateLimiter 
sharedRateLimiter)
             throws IOException {
         RocksDBResourceContainer rocksDBResourceContainer =
-                new RocksDBResourceContainer(configuration, kvDir);
+                new RocksDBResourceContainer(configuration, kvDir, false, 
sharedRateLimiter);
         RocksDBKvBuilder rocksDBKvBuilder =
                 new RocksDBKvBuilder(
                         kvDir,
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainer.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainer.java
index d437c0878..4495613e6 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainer.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainer.java
@@ -22,6 +22,7 @@ import org.apache.fluss.config.ConfigOption;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.config.ReadableConfig;
+import org.apache.fluss.server.kv.KvManager;
 import org.apache.fluss.utils.FileUtils;
 import org.apache.fluss.utils.IOUtils;
 
@@ -33,6 +34,7 @@ import org.rocksdb.CompressionType;
 import org.rocksdb.DBOptions;
 import org.rocksdb.InfoLogLevel;
 import org.rocksdb.PlainTableConfig;
+import org.rocksdb.RateLimiter;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.Statistics;
 import org.rocksdb.TableFormatConfig;
@@ -48,6 +50,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
 /* This file is based on source code of Apache Flink Project 
(https://flink.apache.org/), licensed by the Apache
  * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
  * additional information regarding copyright ownership. */
@@ -73,22 +77,33 @@ public class RocksDBResourceContainer implements 
AutoCloseable {
 
     private final boolean enableStatistics;
 
+    /** The shared rate limiter for all RocksDB instances. */
+    private final RateLimiter sharedRateLimiter;
+
     /** The handles to be closed when the container is closed. */
     private final ArrayList<AutoCloseable> handlesToClose;
 
     @VisibleForTesting
     RocksDBResourceContainer() {
-        this(new Configuration(), null, false);
+        this(new Configuration(), null, false, 
KvManager.getDefaultRateLimiter());
     }
 
     public RocksDBResourceContainer(ReadableConfig configuration, @Nullable 
File instanceBasePath) {
-        this(configuration, instanceBasePath, false);
+        this(configuration, instanceBasePath, false, 
KvManager.getDefaultRateLimiter());
     }
 
     public RocksDBResourceContainer(
             ReadableConfig configuration,
             @Nullable File instanceBasePath,
             boolean enableStatistics) {
+        this(configuration, instanceBasePath, enableStatistics, 
KvManager.getDefaultRateLimiter());
+    }
+
+    public RocksDBResourceContainer(
+            ReadableConfig configuration,
+            @Nullable File instanceBasePath,
+            boolean enableStatistics,
+            RateLimiter sharedRateLimiter) {
         this.configuration = configuration;
 
         this.instanceRocksDBPath =
@@ -96,6 +111,8 @@ public class RocksDBResourceContainer implements 
AutoCloseable {
                         ? 
RocksDBKvBuilder.getInstanceRocksDBPath(instanceBasePath)
                         : null;
         this.enableStatistics = enableStatistics;
+        this.sharedRateLimiter =
+                checkNotNull(sharedRateLimiter, "sharedRateLimiter must not be 
null");
 
         this.handlesToClose = new ArrayList<>();
     }
@@ -117,6 +134,9 @@ public class RocksDBResourceContainer implements 
AutoCloseable {
         // add necessary default options
         opt = opt.setCreateIfMissing(true);
 
+        // set shared rate limiter
+        opt.setRateLimiter(sharedRateLimiter);
+
         if (enableStatistics) {
             Statistics statistics = new Statistics();
             opt.setStatistics(statistics);
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java 
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
index bd185ec62..8eed63c84 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
@@ -220,7 +220,6 @@ public class TabletServer extends ServerBase {
                     new MetadataManager(zkClient, conf, 
lakeCatalogDynamicLoader);
             this.dynamicConfigManager = new DynamicConfigManager(zkClient, 
conf, false);
             dynamicConfigManager.register(lakeCatalogDynamicLoader);
-            dynamicConfigManager.startup();
 
             this.metadataCache = new 
TabletServerMetadataCache(metadataManager);
 
@@ -231,6 +230,11 @@ public class TabletServer extends ServerBase {
             this.kvManager = KvManager.create(conf, zkClient, logManager, 
tabletServerMetricGroup);
             kvManager.startup();
 
+            // Register kvManager to dynamicConfigManager for dynamic 
reconfiguration
+            dynamicConfigManager.register(kvManager);
+            // Start dynamicConfigManager after all reconfigurable components 
are registered
+            dynamicConfigManager.startup();
+
             this.authorizer = AuthorizerLoader.createAuthorizer(conf, 
zkClient, pluginManager);
             if (authorizer != null) {
                 authorizer.startup();
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java
index b19ff40e1..2ba44ac41 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java
@@ -46,6 +46,7 @@ import static 
org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
 import static org.apache.fluss.metadata.DataLakeFormat.PAIMON;
 import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link DynamicConfigManager}. */
@@ -173,7 +174,7 @@ public class DynamicConfigChangeTest {
                                                             "unknown",
                                                             
AlterConfigOpType.SET))))
                     .hasMessageContaining(
-                            "Could not parse value 'unknown' for key 
'datalake.format'");
+                            "Cannot parse 'unknown' as DataLakeFormat for 
config 'datalake.format'");
 
             
assertThat(lakeCatalogDynamicLoader.getLakeCatalogContainer().getDataLakeFormat())
                     .isNull();
@@ -256,4 +257,94 @@ public class DynamicConfigChangeTest {
                     .isEqualTo(PAIMON);
         }
     }
+
+    @Test
+    void testPreventInvalidConfig() throws Exception {
+        // Test that generic type validation prevents invalid config values
+        Configuration configuration = new Configuration();
+        
configuration.setString(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(),
 "100MB");
+
+        DynamicConfigManager dynamicConfigManager =
+                new DynamicConfigManager(zookeeperClient, configuration, true);
+        dynamicConfigManager.startup();
+
+        // Try to set rate limiter to an invalid value - should be rejected by 
generic type
+        // validation
+        assertThatThrownBy(
+                        () ->
+                                dynamicConfigManager.alterConfigs(
+                                        Collections.singletonList(
+                                                new AlterConfig(
+                                                        ConfigOptions
+                                                                
.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC
+                                                                .key(),
+                                                        "invalid_value",
+                                                        
AlterConfigOpType.SET))))
+                .isInstanceOf(ConfigException.class)
+                .hasMessageContaining(
+                        "Cannot parse 'invalid_value' as MemorySize for config 
'kv.rocksdb.shared-rate-limiter.bytes-per-sec'");
+    }
+
+    @Test
+    void testConfigValidatorAllowsValidChange() throws Exception {
+        // Test that generic type validation allows valid config values
+        Configuration configuration = new Configuration();
+        
configuration.setString(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(),
 "100MB");
+
+        DynamicConfigManager dynamicConfigManager =
+                new DynamicConfigManager(zookeeperClient, configuration, true);
+        dynamicConfigManager.startup();
+
+        // Adjust rate limiter value - should succeed
+        assertThatCode(
+                        () ->
+                                dynamicConfigManager.alterConfigs(
+                                        Collections.singletonList(
+                                                new AlterConfig(
+                                                        ConfigOptions
+                                                                
.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC
+                                                                .key(),
+                                                        "200MB",
+                                                        
AlterConfigOpType.SET))))
+                .doesNotThrowAnyException();
+
+        // Verify config was persisted to ZK
+        Map<String, String> zkConfig = zookeeperClient.fetchEntityConfig();
+        
assertThat(zkConfig.get(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
+                .isEqualTo("200MB");
+    }
+
+    @Test
+    void testConfigValidatorWithMultipleValidators() throws Exception {
+        Configuration configuration = new Configuration();
+        
configuration.setString(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(),
 "100MB");
+
+        try (LakeCatalogDynamicLoader lakeCatalogDynamicLoader =
+                new LakeCatalogDynamicLoader(configuration, null, true)) {
+            DynamicConfigManager dynamicConfigManager =
+                    new DynamicConfigManager(zookeeperClient, configuration, 
true);
+
+            // Register reconfigurables - generic type validation works 
automatically
+            dynamicConfigManager.register(lakeCatalogDynamicLoader);
+            dynamicConfigManager.startup();
+
+            // Change multiple configs - generic validation applies to all
+            dynamicConfigManager.alterConfigs(
+                    Arrays.asList(
+                            new AlterConfig(
+                                    
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(),
+                                    "200MB",
+                                    AlterConfigOpType.SET),
+                            new AlterConfig(
+                                    DATALAKE_FORMAT.key(), "paimon", 
AlterConfigOpType.SET)));
+
+            // Verify both configs were applied
+            Map<String, String> zkConfig = zookeeperClient.fetchEntityConfig();
+            
assertThat(zkConfig.get(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
+                    .isEqualTo("200MB");
+            
assertThat(zkConfig.get(DATALAKE_FORMAT.key())).isEqualTo("paimon");
+            
assertThat(lakeCatalogDynamicLoader.getLakeCatalogContainer().getDataLakeFormat())
+                    .isEqualTo(PAIMON);
+        }
+    }
 }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
index 94d41bffe..06db30b34 100644
--- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
+++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
@@ -191,7 +191,8 @@ class KvTabletTest {
                 rowMerger,
                 DEFAULT_COMPRESSION,
                 schemaGetter,
-                tableConf.getChangelogImage());
+                tableConf.getChangelogImage(),
+                KvManager.getDefaultRateLimiter());
     }
 
     @Test
diff --git a/website/docs/engine-flink/datastream.mdx 
b/website/docs/engine-flink/datastream.mdx
index c9aaa3798..a578cdaac 100644
--- a/website/docs/engine-flink/datastream.mdx
+++ b/website/docs/engine-flink/datastream.mdx
@@ -1,6 +1,6 @@
 ---
 title: "DataStream API"
-sidebar_position: 7
+sidebar_position: 8
 ---
 
 # DataStream API
diff --git a/website/docs/engine-flink/delta-joins.md 
b/website/docs/engine-flink/delta-joins.md
index dea8367f0..deeb9ed74 100644
--- a/website/docs/engine-flink/delta-joins.md
+++ b/website/docs/engine-flink/delta-joins.md
@@ -1,7 +1,7 @@
 ---
 sidebar_label: Delta Joins
 title: Flink Delta Joins
-sidebar_position: 6
+sidebar_position: 7
 ---
 
 # The Delta Join
diff --git a/website/docs/engine-flink/getting-started.md 
b/website/docs/engine-flink/getting-started.md
index f8573454c..f2527774c 100644
--- a/website/docs/engine-flink/getting-started.md
+++ b/website/docs/engine-flink/getting-started.md
@@ -34,6 +34,7 @@ For Flink's Table API, Fluss supports the following features:
 | [SQL Show Partitions](ddl.md#show-partitions)     | ✔️    |                  
                      |
 | [SQL Add Partition](ddl.md#add-partition)         | ✔️    |                  
                      |
 | [SQL Drop Partition](ddl.md#drop-partition)       | ✔️    |                  
                      |
+| [Procedures](ddl.md#procedures)                   | ✔️    | ACL management 
and cluster configuration |
 | [SQL Select](reads.md)                            | ✔️    | Support both 
streaming and batch mode. |
 | [SQL Limit](reads.md#limit-read)                  | ✔️    | Only for Log 
Table                     |
 | [SQL Insert Into](writes.md)                      | ✔️    | Support both 
streaming and batch mode. |
diff --git a/website/docs/engine-flink/lookups.md 
b/website/docs/engine-flink/lookups.md
index f4e5abab1..835ee9492 100644
--- a/website/docs/engine-flink/lookups.md
+++ b/website/docs/engine-flink/lookups.md
@@ -1,7 +1,7 @@
 ---
 sidebar_label: Lookups
 title: Flink Lookup Joins
-sidebar_position: 5
+sidebar_position: 6
 ---
 
 # Flink Lookup Joins
diff --git a/website/docs/engine-flink/options.md 
b/website/docs/engine-flink/options.md
index d6462fbfa..2fecb25dd 100644
--- a/website/docs/engine-flink/options.md
+++ b/website/docs/engine-flink/options.md
@@ -1,6 +1,6 @@
 ---
 title: Connector Options
-sidebar_position: 8
+sidebar_position: 9
 ---
 
 # Connector Options
diff --git a/website/docs/engine-flink/procedures.md 
b/website/docs/engine-flink/procedures.md
new file mode 100644
index 000000000..ac6311334
--- /dev/null
+++ b/website/docs/engine-flink/procedures.md
@@ -0,0 +1,255 @@
+---
+sidebar_label: Procedures
+title: Procedures
+sidebar_position: 3
+---
+
+# Procedures
+
+Fluss provides stored procedures to perform administrative and management 
operations through Flink SQL. All procedures are located in the `sys` namespace 
and can be invoked using the `CALL` statement.
+
+## Available Procedures
+
+You can list all available procedures using:
+
+```sql title="Flink SQL"
+SHOW PROCEDURES;
+```
+
+## Access Control Procedures
+
+Fluss provides procedures to manage Access Control Lists (ACLs) for security 
and authorization. See the [Security](../security/overview.md) documentation 
for more details.
+
+### add_acl
+
+Add an ACL entry to grant permissions to a principal.
+
+**Syntax:**
+
+```sql
+CALL [catalog_name.]sys.add_acl(
+  resource => 'STRING',
+  permission => 'STRING', 
+  principal => 'STRING',
+  operation => 'STRING',
+  host => 'STRING'  -- optional, defaults to '*'
+)
+```
+
+**Parameters:**
+
+- `resource` (required): The resource to grant permissions on. Can be 
`'CLUSTER'` for cluster-level permissions or a specific resource name (e.g., 
database or table name).
+- `permission` (required): The permission type to grant. Valid values are 
`'ALLOW'` or `'DENY'`.
+- `principal` (required): The principal to grant permissions to, in the format 
`'Type:Name'` (e.g., `'User:Alice'`).
+- `operation` (required): The operation type to grant. Valid values include 
`'READ'`, `'WRITE'`, `'CREATE'`, `'DELETE'`, `'ALTER'`, `'DESCRIBE'`, 
`'CLUSTER_ACTION'`, `'IDEMPOTENT_WRITE'`.
+- `host` (optional): The host from which the principal can access the 
resource. Defaults to `'*'` (all hosts).
+
+**Example:**
+
+```sql title="Flink SQL"
+-- Use the Fluss catalog (replace 'fluss_catalog' with your catalog name if 
different)
+USE fluss_catalog;
+
+-- Grant read permission to user Alice from any host
+CALL sys.add_acl(
+  resource => 'CLUSTER',
+  permission => 'ALLOW',
+  principal => 'User:Alice',
+  operation => 'READ',
+  host => '*'
+);
+
+-- Grant write permission to user Bob from a specific host
+CALL sys.add_acl(
+  resource => 'my_database.my_table',
+  permission => 'ALLOW',
+  principal => 'User:Bob',
+  operation => 'WRITE',
+  host => '192.168.1.100'
+);
+```
+
+### drop_acl
+
+Remove an ACL entry to revoke permissions.
+
+**Syntax:**
+
+```sql
+CALL [catalog_name.]sys.drop_acl(
+  resource => 'STRING',
+  permission => 'STRING',
+  principal => 'STRING', 
+  operation => 'STRING',
+  host => 'STRING'  -- optional, defaults to '*'
+)
+```
+
+**Parameters:**
+
+All parameters accept the same values as `add_acl`. You can use `'ANY'` as a 
wildcard value to match multiple entries for batch deletion.
+
+**Example:**
+
+```sql title="Flink SQL"
+-- Use the Fluss catalog (replace 'fluss_catalog' with your catalog name if 
different)
+USE fluss_catalog;
+
+-- Remove a specific ACL entry
+CALL sys.drop_acl(
+  resource => 'CLUSTER',
+  permission => 'ALLOW',
+  principal => 'User:Alice',
+  operation => 'READ',
+  host => '*'
+);
+
+-- Remove all ACL entries for a specific user
+CALL sys.drop_acl(
+  resource => 'ANY',
+  permission => 'ANY',
+  principal => 'User:Alice',
+  operation => 'ANY',
+  host => 'ANY'
+);
+```
+
+### list_acl
+
+List ACL entries matching the specified filters.
+
+**Syntax:**
+
+```sql
+CALL [catalog_name.]sys.list_acl(
+  resource => 'STRING',
+  permission => 'STRING',  -- optional, defaults to 'ANY'
+  principal => 'STRING',   -- optional, defaults to 'ANY'
+  operation => 'STRING',   -- optional, defaults to 'ANY'
+  host => 'STRING'         -- optional, defaults to 'ANY'
+)
+```
+
+**Parameters:**
+
+All parameters accept the same values as `add_acl`. Use `'ANY'` as a wildcard 
to match all values for that parameter.
+
+**Returns:** An array of strings, each representing an ACL entry in the 
format: 
`resource="...";permission="...";principal="...";operation="...";host="..."`
+
+**Example:**
+
+```sql title="Flink SQL"
+-- Use the Fluss catalog (replace 'fluss_catalog' with your catalog name if 
different)
+USE fluss_catalog;
+
+-- List all ACL entries
+CALL sys.list_acl(resource => 'ANY');
+
+-- List all ACL entries for a specific user
+CALL sys.list_acl(
+  resource => 'ANY',
+  principal => 'User:Alice'
+);
+
+-- List all read permissions
+CALL sys.list_acl(
+  resource => 'ANY',
+  operation => 'READ'
+);
+```
+
+## Cluster Configuration Procedures
+
+Fluss provides procedures to dynamically manage cluster configurations without 
requiring a server restart.
+
+### get_cluster_config
+
+Retrieve cluster configuration values.
+
+**Syntax:**
+
+```sql
+-- Get a specific configuration
+CALL [catalog_name.]sys.get_cluster_config(config_key => 'STRING')
+
+-- Get all cluster configurations
+CALL [catalog_name.]sys.get_cluster_config()
+```
+
+**Parameters:**
+
+- `config_key` (optional): The configuration key to retrieve. If omitted, 
returns all cluster configurations.
+
+**Returns:** A table with columns:
+- `config_key`: The configuration key name
+- `config_value`: The current value
+- `config_source`: The source of the configuration (e.g., `DYNAMIC_CONFIG`, 
`STATIC_CONFIG`)
+
+**Example:**
+
+```sql title="Flink SQL"
+-- Use the Fluss catalog (replace 'fluss_catalog' with your catalog name if 
different)
+USE fluss_catalog;
+
+-- Get a specific configuration
+CALL sys.get_cluster_config(
+  config_key => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec'
+);
+
+-- Get all cluster configurations
+CALL sys.get_cluster_config();
+```
+
+### set_cluster_config
+
+Set or delete a cluster configuration dynamically.
+
+**Syntax:**
+
+```sql
+-- Set a configuration value
+CALL [catalog_name.]sys.set_cluster_config(
+  config_key => 'STRING',
+  config_value => 'STRING'
+)
+
+-- Delete a configuration (reset to default)
+CALL [catalog_name.]sys.set_cluster_config(config_key => 'STRING')
+```
+
+**Parameters:**
+
+- `config_key` (required): The configuration key to modify.
+- `config_value` (optional): The new value to set. If omitted or empty, the 
configuration is deleted (reset to default).
+
+**Important Notes:**
+
+- Changes are validated before being applied and persisted in ZooKeeper
+- Changes are automatically applied to all servers (Coordinator and 
TabletServers)
+- Changes survive server restarts
+- Not all configurations support dynamic changes. The server will reject 
invalid modifications
+
+**Example:**
+
+```sql title="Flink SQL"
+-- Use the Fluss catalog (replace 'fluss_catalog' with your catalog name if 
different)
+USE fluss_catalog;
+
+-- Set RocksDB rate limiter
+CALL sys.set_cluster_config(
+  config_key => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec',
+  config_value => '200MB'
+);
+
+-- Set datalake format
+CALL sys.set_cluster_config(
+  config_key => 'datalake.format',
+  config_value => 'paimon'
+);
+
+-- Delete a configuration (reset to default)
+CALL sys.set_cluster_config(
+  config_key => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec'
+);
+```
+
diff --git a/website/docs/engine-flink/reads.md 
b/website/docs/engine-flink/reads.md
index bcb2c2475..4cab45fbd 100644
--- a/website/docs/engine-flink/reads.md
+++ b/website/docs/engine-flink/reads.md
@@ -1,7 +1,7 @@
 ---
 sidebar_label: Reads
 title: Flink Reads
-sidebar_position: 4
+sidebar_position: 5
 ---
 
 # Flink Reads
diff --git a/website/docs/engine-flink/writes.md 
b/website/docs/engine-flink/writes.md
index 29af716fd..354aa9747 100644
--- a/website/docs/engine-flink/writes.md
+++ b/website/docs/engine-flink/writes.md
@@ -1,7 +1,7 @@
 ---
 sidebar_label: Writes
 title: Flink Writes
-sidebar_position: 3
+sidebar_position: 4
 ---
 
 # Flink Writes
diff --git a/website/docs/maintenance/configuration.md 
b/website/docs/maintenance/configuration.md
index f6988aace..97da3457d 100644
--- a/website/docs/maintenance/configuration.md
+++ b/website/docs/maintenance/configuration.md
@@ -157,6 +157,7 @@ during the Fluss cluster working.
 | kv.rocksdb.use-bloom-filter                       | Boolean    | true        
                  | If true, every newly created SST file will contain a Bloom 
filter. It is enabled by default.                                               
                                                                                
                                                                                
                                                                                
               [...]
 | kv.rocksdb.bloom-filter.bits-per-key              | Double     | 10.0        
                  | Bits per key that bloom filter will use, this only take 
effect when bloom filter is used. The default value is 10.0.                    
                                                                                
                                                                                
                                                                                
                  [...]
 | kv.rocksdb.bloom-filter.block-based-mode          | Boolean    | false       
                  | If true, RocksDB will use block-based filter instead of 
full filter, this only take effect when bloom filter is used. The default value 
is `false`.                                                                     
                                                                                
                                                                                
                  [...]
+| kv.rocksdb.shared-rate-limiter-bytes-per-sec              | MemorySize | 
Long.MAX_VALUE                | The bytes per second rate limit for RocksDB 
flush and compaction operations shared across all RocksDB instances on the 
TabletServer. The rate limiter is always enabled. The default value is 
Long.MAX_VALUE (effectively unlimited). Set to a lower value (e.g., 100MB) to 
limit the rate. This configuration can be updated dynamically without server 
restart. See [Updating Configs](operatio [...]
 | kv.recover.log-record-batch.max-size              | MemorySize | 16mb        
                  | The max fetch size for fetching log to apply to kv during 
recovering kv.                                                                  
                                                                                
                                                                                
                                                                                
                [...]
 
 ## Metrics
diff --git a/website/docs/maintenance/operations/updating-configs.md 
b/website/docs/maintenance/operations/updating-configs.md
index 0643a46fe..8b7a6a1e1 100644
--- a/website/docs/maintenance/operations/updating-configs.md
+++ b/website/docs/maintenance/operations/updating-configs.md
@@ -15,9 +15,13 @@ From Fluss version 0.8 onwards, some of the server configs 
can be updated withou
 Currently, the supported dynamically updatable server configurations include:
 - `datalake.format`: Enable lakehouse storage by specifying the lakehouse 
format, e.g., `paimon`, `iceberg`.
 - Options with prefix `datalake.${datalake.format}`
+- `kv.rocksdb.shared-rate-limiter.bytes-per-sec`: Control RocksDB flush and 
compaction write rate shared across all RocksDB instances on the TabletServer. 
The rate limiter is always enabled. Set to a lower value (e.g., 100MB) to limit 
the rate, or a very high value to effectively disable rate limiting.
 
 
-You can update the configuration of a cluster with [Java 
client](apis/java-client.md).
+You can update the configuration of a cluster with [Java 
client](#using-java-client) or [Flink 
Procedures](../../engine-flink/procedures.md#cluster-configuration-procedures).
+
+### Using Java Client
+
 Here is a code snippet to demonstrate how to update the cluster configurations 
using the Java Client:
 
 ```java
@@ -30,6 +34,11 @@ admin.alterClusterConfigs(
 admin.alterClusterConfigs(
         Collections.singletonList(
                 new AlterConfig(DATALAKE_FORMAT.key(), "paimon", 
AlterConfigOpType.DELETE)));
+
+// Set RocksDB shared rate limiter to 200MB/sec
+admin.alterClusterConfigs(
+        Collections.singletonList(
+                new AlterConfig(KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(), 
"200MB", AlterConfigOpType.SET)));
 ```
 
 The `AlterConfig` class contains three properties:
@@ -37,6 +46,9 @@ The `AlterConfig` class contains three properties:
 * `value`: The configuration value to be set (e.g., `paimon`)
 * `opType`: The operation type, either `AlterConfigOpType.SET` or 
`AlterConfigOpType.DELETE`
 
+### Using Flink Stored Procedures
+
+For certain configurations, Fluss provides convenient Flink stored procedures 
that can be called directly from Flink SQL. See 
[Procedures](engine-flink/procedures.md#cluster-configuration-procedures) for 
detailed documentation on using `get_cluster_config` and `set_cluster_config` 
procedures.
 
 ## Updating Table Configs
 


Reply via email to