wuchong commented on code in PR #2178:
URL: https://github.com/apache/fluss/pull/2178#discussion_r2637686411


##########
website/docs/maintenance/operations/updating-configs.md:
##########
@@ -30,13 +34,50 @@ admin.alterClusterConfigs(
 admin.alterClusterConfigs(
         Collections.singletonList(
                 new AlterConfig(DATALAKE_FORMAT.key(), "paimon", 
AlterConfigOpType.DELETE)));
+
+// Set RocksDB shared rate limiter to 200MB/sec
+admin.alterClusterConfigs(
+        Collections.singletonList(
+                new AlterConfig("kv.shared-rate-limiter-bytes-per-sec", 
"209715200", AlterConfigOpType.SET)));
 ```
 
 The `AlterConfig` class contains three properties:
 * `key`: The configuration key to be modified (e.g., `datalake.format`)
 * `value`: The configuration value to be set (e.g., `paimon`)
 * `opType`: The operation type, either `AlterConfigOpType.SET` or 
`AlterConfigOpType.DELETE`
 
+### Using Flink Stored Procedures
+
+For certain configurations, Fluss provides convenient Flink stored procedures 
that can be called directly from Flink SQL.
+
+#### Managing RocksDB Rate Limiter
+
+You can dynamically adjust the shared RocksDB rate limiter for all 
TabletServers using Flink stored procedures:
+
+**Set Shared RocksDB Rate Limiter:**
+```sql title="Flink SQL"
+-- Set rate limiter to 200MB/sec (recommended, use named argument, only 
supported since Flink 1.19)
+CALL fluss_catalog.sys.set_shared_rocksdb_rate_limiter(rate_limit => '200MB');
+
+-- Set rate limiter to 500MB/sec (use indexed argument)
+CALL fluss_catalog.sys.set_shared_rocksdb_rate_limiter('500MB');

Review Comment:
   It seems this is not supported yet, should use `set_cluster_config` in 
documentation? 



##########
website/docs/maintenance/operations/updating-configs.md:
##########
@@ -15,9 +15,13 @@ From Fluss version 0.8 onwards, some of the server configs 
can be updated withou
 Currently, the supported dynamically updatable server configurations include:
 - `datalake.format`: Enable lakehouse storage by specifying the lakehouse 
format, e.g., `paimon`, `iceberg`.
 - Options with prefix `datalake.${datalake.format}`
+- `kv.shared-rate-limiter-bytes-per-sec`: Control RocksDB flush and compaction 
write rate shared across all RocksDB instances on the TabletServer.
 
 
-You can update the configuration of a cluster with [Java 
client](apis/java-client.md).
+You can update the configuration of a cluster with [Java 
client](apis/java-client.md) or [Flink Stored 
Procedures](#using-flink-stored-procedures).

Review Comment:
   ```suggestion
   You can update the configuration of a cluster with [Java 
client](#using-java-client) or [Flink Stored 
Procedures](#using-flink-stored-procedures).
   ```



##########
website/docs/maintenance/configuration.md:
##########
@@ -156,6 +156,7 @@ during the Fluss cluster working.
 | kv.rocksdb.use-bloom-filter                       | Boolean    | true        
                  | If true, every newly created SST file will contain a Bloom 
filter. It is enabled by default.                                               
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                       
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                   |
 | kv.rocksdb.bloom-filter.bits-per-key              | Double     | 10.0        
                  | Bits per key that bloom filter will use, this only take 
effect when bloom filter is used. The default value is 10.0.                    
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                          
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                   |
 | kv.rocksdb.bloom-filter.block-based-mode          | Boolean    | false       
                  | If true, RocksDB will use block-based filter instead of 
full filter, this only take effect when bloom filter is used. The default value 
is `false`.                                                                     
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                          
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                   |
+| kv.shared-rate-limiter-bytes-per-sec              | MemorySize | 0b          
                  | The bytes per second rate limit for RocksDB flush and 
compaction operations shared across all RocksDB instances on the TabletServer. 
A value of 0 (default) means rate limiting is disabled. This configuration can 
be updated dynamically without server restart. See [Updating 
Configs](operations/updating-configs.md) for more details.                      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                 
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
               |

Review Comment:
   Update the config key and the default value



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigProcedure.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.procedure;
+
+import org.apache.fluss.config.cluster.ConfigEntry;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.types.Row;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Procedure to get cluster configuration(s).
+ *
+ * <p>This procedure allows querying dynamic cluster configurations. It can 
retrieve:
+ *
+ * <ul>
+ *   <li>A specific configuration key
+ *   <li>All configurations (when key parameter is null or empty)
+ * </ul>
+ *
+ * <p>Usage examples:
+ *
+ * <pre>
+ * -- Get a specific configuration
+ * CALL sys.get_cluster_config('kv.shared-rate-limiter.bytes-per-sec');
+ *
+ * -- Get all cluster configurations
+ * CALL sys.get_cluster_config();
+ * </pre>
+ */
+public class GetClusterConfigProcedure extends ProcedureBase {
+
+    @ProcedureHint(
+            output =
+                    @DataTypeHint(
+                            "ROW<config_key STRING, config_value STRING, 
config_source STRING>"))
+    public Row[] call(ProcedureContext context) throws Exception {
+        return getConfigs(null);
+    }
+
+    @ProcedureHint(
+            argument = {@ArgumentHint(name = "config_key", type = 
@DataTypeHint("STRING"))},
+            output =
+                    @DataTypeHint(
+                            "ROW<config_key STRING, config_value STRING, 
config_source STRING>"))
+    public Row[] call(ProcedureContext context, String configKey) throws 
Exception {
+        return getConfigs(configKey);
+    }
+
+    private Row[] getConfigs(@Nullable String configKey) throws Exception {
+        try {
+            // Get all cluster configurations
+            Collection<ConfigEntry> configs = 
admin.describeClusterConfigs().get();
+
+            List<Row> results = new ArrayList<>();
+
+            if (configKey == null || configKey.isEmpty()) {
+                // Return all configurations
+                for (ConfigEntry entry : configs) {
+                    results.add(
+                            Row.of(
+                                    entry.key(),
+                                    entry.value(),
+                                    entry.source() != null ? 
entry.source().name() : "UNKNOWN"));
+                }
+
+                if (results.isEmpty()) {
+                    return new Row[] {Row.of("No cluster configurations 
found", null, null)};
+                }
+            } else {
+                // Find specific configuration
+                for (ConfigEntry entry : configs) {
+                    if (entry.key().equals(configKey)) {
+                        results.add(
+                                Row.of(
+                                        entry.key(),
+                                        entry.value(),
+                                        entry.source() != null
+                                                ? entry.source().name()
+                                                : "UNKNOWN"));
+                        break;
+                    }
+                }
+
+                if (results.isEmpty()) {
+                    return new Row[] {
+                        Row.of(
+                                String.format("Configuration key '%s' not 
found", configKey),
+                                null,
+                                null)

Review Comment:
   ditto



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigProcedure.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.procedure;
+
+import org.apache.fluss.config.cluster.ConfigEntry;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.types.Row;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Procedure to get cluster configuration(s).
+ *
+ * <p>This procedure allows querying dynamic cluster configurations. It can 
retrieve:
+ *
+ * <ul>
+ *   <li>A specific configuration key
+ *   <li>All configurations (when key parameter is null or empty)
+ * </ul>
+ *
+ * <p>Usage examples:
+ *
+ * <pre>
+ * -- Get a specific configuration
+ * CALL sys.get_cluster_config('kv.shared-rate-limiter.bytes-per-sec');

Review Comment:
   Replace all the `kv.shared-rate-limiter.bytes-per-sec`  in docs/javadocs to 
`kv.rocksdb.shared-rate-limiter.bytes-per-sec` which is the true config key.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigProcedure.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.procedure;
+
+import org.apache.fluss.config.cluster.ConfigEntry;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.types.Row;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Procedure to get cluster configuration(s).
+ *
+ * <p>This procedure allows querying dynamic cluster configurations. It can 
retrieve:
+ *
+ * <ul>
+ *   <li>A specific configuration key
+ *   <li>All configurations (when key parameter is null or empty)
+ * </ul>
+ *
+ * <p>Usage examples:
+ *
+ * <pre>
+ * -- Get a specific configuration
+ * CALL sys.get_cluster_config('kv.shared-rate-limiter.bytes-per-sec');
+ *
+ * -- Get all cluster configurations
+ * CALL sys.get_cluster_config();
+ * </pre>
+ */
+public class GetClusterConfigProcedure extends ProcedureBase {
+
+    @ProcedureHint(
+            output =
+                    @DataTypeHint(
+                            "ROW<config_key STRING, config_value STRING, 
config_source STRING>"))
+    public Row[] call(ProcedureContext context) throws Exception {
+        return getConfigs(null);
+    }
+
+    @ProcedureHint(
+            argument = {@ArgumentHint(name = "config_key", type = 
@DataTypeHint("STRING"))},
+            output =
+                    @DataTypeHint(
+                            "ROW<config_key STRING, config_value STRING, 
config_source STRING>"))
+    public Row[] call(ProcedureContext context, String configKey) throws 
Exception {
+        return getConfigs(configKey);
+    }
+
+    private Row[] getConfigs(@Nullable String configKey) throws Exception {
+        try {
+            // Get all cluster configurations
+            Collection<ConfigEntry> configs = 
admin.describeClusterConfigs().get();
+
+            List<Row> results = new ArrayList<>();
+
+            if (configKey == null || configKey.isEmpty()) {
+                // Return all configurations
+                for (ConfigEntry entry : configs) {
+                    results.add(
+                            Row.of(
+                                    entry.key(),
+                                    entry.value(),
+                                    entry.source() != null ? 
entry.source().name() : "UNKNOWN"));
+                }
+
+                if (results.isEmpty()) {
+                    return new Row[] {Row.of("No cluster configurations 
found", null, null)};

Review Comment:
   If there is no configuration, should just return empty as is? Just like 
select from an empty table? Otherwise, this may be confusing there is a row in 
the config result: 
   
   config_key | config_value | config_source
   -----|-----|-----
   No cluster configurations found | |
   



##########
fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java:
##########
@@ -214,10 +246,13 @@ public static KvTablet create(
                 schemaGetter);
     }
 
-    private static RocksDBKv buildRocksDBKv(Configuration configuration, File 
kvDir)
+    private static RocksDBKv buildRocksDBKv(
+            Configuration configuration,
+            File kvDir,
+            @Nullable org.rocksdb.RateLimiter sharedRateLimiter)

Review Comment:
   Can we replace the qualified class name with `import`?



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigProcedure.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.procedure;
+
+import org.apache.fluss.config.cluster.ConfigEntry;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.types.Row;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Procedure to get cluster configuration(s).
+ *
+ * <p>This procedure allows querying dynamic cluster configurations. It can 
retrieve:
+ *
+ * <ul>
+ *   <li>A specific configuration key
+ *   <li>All configurations (when key parameter is null or empty)
+ * </ul>
+ *
+ * <p>Usage examples:
+ *
+ * <pre>
+ * -- Get a specific configuration
+ * CALL sys.get_cluster_config('kv.shared-rate-limiter.bytes-per-sec');
+ *
+ * -- Get all cluster configurations
+ * CALL sys.get_cluster_config();

Review Comment:
   Could you please add a **“Procedures”** section under the **“DDL”** section 
in the **“Engine Flink”** documentation to list all supported procedures?
   
   This can be tracked in a separate issue if preferred.  
   
   For the format, I’d recommend following the style used by Iceberg, using 
**headers and descriptive text** rather than tables. 
   
   Examples:  
   - https://iceberg.apache.org/docs/nightly/spark-procedures/#spark-procedures 
 
   - https://paimon.apache.org/docs/master/flink/procedures/  



##########
fluss-server/src/main/java/org/apache/fluss/server/kv/KvConfigValidator.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv;
+
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.config.MemorySize;
+import org.apache.fluss.config.cluster.ConfigValidator;
+import org.apache.fluss.exception.ConfigException;
+
+import javax.annotation.Nullable;
+
+/**
+ * Stateless validator for RocksDB shared rate limiter configuration.
+ *
+ * <p>This validator can be registered on CoordinatorServer without requiring 
a KvManager instance,
+ * enabling early validation of KV configs before they are persisted to 
ZooKeeper.
+ *
+ * <p>The validator enforces the following rules for RocksDB shared rate 
limiter:
+ *
+ * <ul>
+ *   <li>Cannot enable rate limiter dynamically if it was not enabled at 
server startup
+ *   <li>Cannot disable rate limiter dynamically if it was enabled at server 
startup

Review Comment:
   I don’t fully understand the reason behind this limitation. Is there 
significant overhead in always having a `RateLimiter` enabled in RocksDB, even 
when configured with a very high throughput limit (e.g., 1 TB/s)?  
   
   If the performance cost is negligible and especially since we’re enabling 
the rate limiter by default, I’d suggest treating a very high value (or 
`Long.MAX_VALUE`) as the effective “disabled” state (this also means we don't 
rely on `0` to disable)
   
   I think switching between enable and disable is a common and useful 
functionality. Otherwise, it's very confusing the documentation says set to `0` 
to disable, but it doesn't work.



##########
website/docs/maintenance/operations/updating-configs.md:
##########
@@ -30,13 +34,50 @@ admin.alterClusterConfigs(
 admin.alterClusterConfigs(
         Collections.singletonList(
                 new AlterConfig(DATALAKE_FORMAT.key(), "paimon", 
AlterConfigOpType.DELETE)));
+
+// Set RocksDB shared rate limiter to 200MB/sec
+admin.alterClusterConfigs(
+        Collections.singletonList(
+                new AlterConfig("kv.shared-rate-limiter-bytes-per-sec", 
"209715200", AlterConfigOpType.SET)));
 ```
 
 The `AlterConfig` class contains three properties:
 * `key`: The configuration key to be modified (e.g., `datalake.format`)
 * `value`: The configuration value to be set (e.g., `paimon`)
 * `opType`: The operation type, either `AlterConfigOpType.SET` or 
`AlterConfigOpType.DELETE`
 
+### Using Flink Stored Procedures
+
+For certain configurations, Fluss provides convenient Flink stored procedures 
that can be called directly from Flink SQL.
+
+#### Managing RocksDB Rate Limiter
+
+You can dynamically adjust the shared RocksDB rate limiter for all 
TabletServers using Flink stored procedures:
+
+**Set Shared RocksDB Rate Limiter:**
+```sql title="Flink SQL"
+-- Set rate limiter to 200MB/sec (recommended, use named argument, only 
supported since Flink 1.19)
+CALL fluss_catalog.sys.set_shared_rocksdb_rate_limiter(rate_limit => '200MB');

Review Comment:
   Invoke `USE fluss_catalog` before all the `CALL` statements, this makes the 
CALL statements is more reusable as the catalog name maybe different for 
different users. 



##########
website/docs/maintenance/operations/updating-configs.md:
##########
@@ -30,13 +34,50 @@ admin.alterClusterConfigs(
 admin.alterClusterConfigs(
         Collections.singletonList(
                 new AlterConfig(DATALAKE_FORMAT.key(), "paimon", 
AlterConfigOpType.DELETE)));
+
+// Set RocksDB shared rate limiter to 200MB/sec
+admin.alterClusterConfigs(
+        Collections.singletonList(
+                new AlterConfig("kv.shared-rate-limiter-bytes-per-sec", 
"209715200", AlterConfigOpType.SET)));

Review Comment:
   ```suggestion
                   new AlterConfig(KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(), 
"200MB", AlterConfigOpType.SET)));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to