wuchong commented on code in PR #2178:
URL: https://github.com/apache/fluss/pull/2178#discussion_r2637686411
##########
website/docs/maintenance/operations/updating-configs.md:
##########
@@ -30,13 +34,50 @@ admin.alterClusterConfigs(
admin.alterClusterConfigs(
Collections.singletonList(
new AlterConfig(DATALAKE_FORMAT.key(), "paimon",
AlterConfigOpType.DELETE)));
+
+// Set RocksDB shared rate limiter to 200MB/sec
+admin.alterClusterConfigs(
+ Collections.singletonList(
+ new AlterConfig("kv.shared-rate-limiter-bytes-per-sec",
"209715200", AlterConfigOpType.SET)));
```
The `AlterConfig` class contains three properties:
* `key`: The configuration key to be modified (e.g., `datalake.format`)
* `value`: The configuration value to be set (e.g., `paimon`)
* `opType`: The operation type, either `AlterConfigOpType.SET` or
`AlterConfigOpType.DELETE`
+### Using Flink Stored Procedures
+
+For certain configurations, Fluss provides convenient Flink stored procedures
that can be called directly from Flink SQL.
+
+#### Managing RocksDB Rate Limiter
+
+You can dynamically adjust the shared RocksDB rate limiter for all
TabletServers using Flink stored procedures:
+
+**Set Shared RocksDB Rate Limiter:**
+```sql title="Flink SQL"
+-- Set rate limiter to 200MB/sec (recommended, use named argument, only
supported since Flink 1.19)
+CALL fluss_catalog.sys.set_shared_rocksdb_rate_limiter(rate_limit => '200MB');
+
+-- Set rate limiter to 500MB/sec (use indexed argument)
+CALL fluss_catalog.sys.set_shared_rocksdb_rate_limiter('500MB');
Review Comment:
It seems this is not supported yet, should use `set_cluster_config` in
documentation?
##########
website/docs/maintenance/operations/updating-configs.md:
##########
@@ -15,9 +15,13 @@ From Fluss version 0.8 onwards, some of the server configs
can be updated withou
Currently, the supported dynamically updatable server configurations include:
- `datalake.format`: Enable lakehouse storage by specifying the lakehouse
format, e.g., `paimon`, `iceberg`.
- Options with prefix `datalake.${datalake.format}`
+- `kv.shared-rate-limiter-bytes-per-sec`: Control RocksDB flush and compaction
write rate shared across all RocksDB instances on the TabletServer.
-You can update the configuration of a cluster with [Java
client](apis/java-client.md).
+You can update the configuration of a cluster with [Java
client](apis/java-client.md) or [Flink Stored
Procedures](#using-flink-stored-procedures).
Review Comment:
```suggestion
You can update the configuration of a cluster with [Java
client](#using-java-client) or [Flink Stored
Procedures](#using-flink-stored-procedures).
```
##########
website/docs/maintenance/configuration.md:
##########
@@ -156,6 +156,7 @@ during the Fluss cluster working.
| kv.rocksdb.use-bloom-filter | Boolean | true
| If true, every newly created SST file will contain a Bloom
filter. It is enabled by default.
|
| kv.rocksdb.bloom-filter.bits-per-key | Double | 10.0
| Bits per key that bloom filter will use, this only take
effect when bloom filter is used. The default value is 10.0.
|
| kv.rocksdb.bloom-filter.block-based-mode | Boolean | false
| If true, RocksDB will use block-based filter instead of
full filter, this only take effect when bloom filter is used. The default value
is `false`.
|
+| kv.shared-rate-limiter-bytes-per-sec | MemorySize | 0b
| The bytes per second rate limit for RocksDB flush and
compaction operations shared across all RocksDB instances on the TabletServer.
A value of 0 (default) means rate limiting is disabled. This configuration can
be updated dynamically without server restart. See [Updating
Configs](operations/updating-configs.md) for more details.
|
Review Comment:
Update the config key and the default value
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigProcedure.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.procedure;
+
+import org.apache.fluss.config.cluster.ConfigEntry;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.types.Row;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Procedure to get cluster configuration(s).
+ *
+ * <p>This procedure allows querying dynamic cluster configurations. It can
retrieve:
+ *
+ * <ul>
+ * <li>A specific configuration key
+ * <li>All configurations (when key parameter is null or empty)
+ * </ul>
+ *
+ * <p>Usage examples:
+ *
+ * <pre>
+ * -- Get a specific configuration
+ * CALL sys.get_cluster_config('kv.shared-rate-limiter.bytes-per-sec');
+ *
+ * -- Get all cluster configurations
+ * CALL sys.get_cluster_config();
+ * </pre>
+ */
+public class GetClusterConfigProcedure extends ProcedureBase {
+
+ @ProcedureHint(
+ output =
+ @DataTypeHint(
+ "ROW<config_key STRING, config_value STRING,
config_source STRING>"))
+ public Row[] call(ProcedureContext context) throws Exception {
+ return getConfigs(null);
+ }
+
+ @ProcedureHint(
+ argument = {@ArgumentHint(name = "config_key", type =
@DataTypeHint("STRING"))},
+ output =
+ @DataTypeHint(
+ "ROW<config_key STRING, config_value STRING,
config_source STRING>"))
+ public Row[] call(ProcedureContext context, String configKey) throws
Exception {
+ return getConfigs(configKey);
+ }
+
+ private Row[] getConfigs(@Nullable String configKey) throws Exception {
+ try {
+ // Get all cluster configurations
+ Collection<ConfigEntry> configs =
admin.describeClusterConfigs().get();
+
+ List<Row> results = new ArrayList<>();
+
+ if (configKey == null || configKey.isEmpty()) {
+ // Return all configurations
+ for (ConfigEntry entry : configs) {
+ results.add(
+ Row.of(
+ entry.key(),
+ entry.value(),
+ entry.source() != null ?
entry.source().name() : "UNKNOWN"));
+ }
+
+ if (results.isEmpty()) {
+ return new Row[] {Row.of("No cluster configurations
found", null, null)};
+ }
+ } else {
+ // Find specific configuration
+ for (ConfigEntry entry : configs) {
+ if (entry.key().equals(configKey)) {
+ results.add(
+ Row.of(
+ entry.key(),
+ entry.value(),
+ entry.source() != null
+ ? entry.source().name()
+ : "UNKNOWN"));
+ break;
+ }
+ }
+
+ if (results.isEmpty()) {
+ return new Row[] {
+ Row.of(
+ String.format("Configuration key '%s' not
found", configKey),
+ null,
+ null)
Review Comment:
ditto
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigProcedure.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.procedure;
+
+import org.apache.fluss.config.cluster.ConfigEntry;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.types.Row;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Procedure to get cluster configuration(s).
+ *
+ * <p>This procedure allows querying dynamic cluster configurations. It can
retrieve:
+ *
+ * <ul>
+ * <li>A specific configuration key
+ * <li>All configurations (when key parameter is null or empty)
+ * </ul>
+ *
+ * <p>Usage examples:
+ *
+ * <pre>
+ * -- Get a specific configuration
+ * CALL sys.get_cluster_config('kv.shared-rate-limiter.bytes-per-sec');
Review Comment:
Replace all the `kv.shared-rate-limiter.bytes-per-sec` in docs/javadocs to
`kv.rocksdb.shared-rate-limiter.bytes-per-sec` which is the true config key.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigProcedure.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.procedure;
+
+import org.apache.fluss.config.cluster.ConfigEntry;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.types.Row;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Procedure to get cluster configuration(s).
+ *
+ * <p>This procedure allows querying dynamic cluster configurations. It can
retrieve:
+ *
+ * <ul>
+ * <li>A specific configuration key
+ * <li>All configurations (when key parameter is null or empty)
+ * </ul>
+ *
+ * <p>Usage examples:
+ *
+ * <pre>
+ * -- Get a specific configuration
+ * CALL sys.get_cluster_config('kv.shared-rate-limiter.bytes-per-sec');
+ *
+ * -- Get all cluster configurations
+ * CALL sys.get_cluster_config();
+ * </pre>
+ */
+public class GetClusterConfigProcedure extends ProcedureBase {
+
+ @ProcedureHint(
+ output =
+ @DataTypeHint(
+ "ROW<config_key STRING, config_value STRING,
config_source STRING>"))
+ public Row[] call(ProcedureContext context) throws Exception {
+ return getConfigs(null);
+ }
+
+ @ProcedureHint(
+ argument = {@ArgumentHint(name = "config_key", type =
@DataTypeHint("STRING"))},
+ output =
+ @DataTypeHint(
+ "ROW<config_key STRING, config_value STRING,
config_source STRING>"))
+ public Row[] call(ProcedureContext context, String configKey) throws
Exception {
+ return getConfigs(configKey);
+ }
+
+ private Row[] getConfigs(@Nullable String configKey) throws Exception {
+ try {
+ // Get all cluster configurations
+ Collection<ConfigEntry> configs =
admin.describeClusterConfigs().get();
+
+ List<Row> results = new ArrayList<>();
+
+ if (configKey == null || configKey.isEmpty()) {
+ // Return all configurations
+ for (ConfigEntry entry : configs) {
+ results.add(
+ Row.of(
+ entry.key(),
+ entry.value(),
+ entry.source() != null ?
entry.source().name() : "UNKNOWN"));
+ }
+
+ if (results.isEmpty()) {
+ return new Row[] {Row.of("No cluster configurations
found", null, null)};
Review Comment:
If there is no configuration, should just return empty as is? Just like
select from an empty table? Otherwise, this may be confusing there is a row in
the config result:
config_key | config_value | config_source
-----|-----|-----
No cluster configurations found | |
##########
fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java:
##########
@@ -214,10 +246,13 @@ public static KvTablet create(
schemaGetter);
}
- private static RocksDBKv buildRocksDBKv(Configuration configuration, File
kvDir)
+ private static RocksDBKv buildRocksDBKv(
+ Configuration configuration,
+ File kvDir,
+ @Nullable org.rocksdb.RateLimiter sharedRateLimiter)
Review Comment:
Can we replace the qualified class name with `import`?
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigProcedure.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.procedure;
+
+import org.apache.fluss.config.cluster.ConfigEntry;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.types.Row;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Procedure to get cluster configuration(s).
+ *
+ * <p>This procedure allows querying dynamic cluster configurations. It can
retrieve:
+ *
+ * <ul>
+ * <li>A specific configuration key
+ * <li>All configurations (when key parameter is null or empty)
+ * </ul>
+ *
+ * <p>Usage examples:
+ *
+ * <pre>
+ * -- Get a specific configuration
+ * CALL sys.get_cluster_config('kv.shared-rate-limiter.bytes-per-sec');
+ *
+ * -- Get all cluster configurations
+ * CALL sys.get_cluster_config();
Review Comment:
Could you please add a **“Procedures”** section under the **“DDL”** section
in the **“Engine Flink”** documentation to list all supported procedures?
This can be tracked in a separate issue if preferred.
For the format, I’d recommend following the style used by Iceberg, using
**headers and descriptive text** rather than tables.
Examples:
- https://iceberg.apache.org/docs/nightly/spark-procedures/#spark-procedures
- https://paimon.apache.org/docs/master/flink/procedures/
##########
fluss-server/src/main/java/org/apache/fluss/server/kv/KvConfigValidator.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv;
+
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.config.MemorySize;
+import org.apache.fluss.config.cluster.ConfigValidator;
+import org.apache.fluss.exception.ConfigException;
+
+import javax.annotation.Nullable;
+
+/**
+ * Stateless validator for RocksDB shared rate limiter configuration.
+ *
+ * <p>This validator can be registered on CoordinatorServer without requiring
a KvManager instance,
+ * enabling early validation of KV configs before they are persisted to
ZooKeeper.
+ *
+ * <p>The validator enforces the following rules for RocksDB shared rate
limiter:
+ *
+ * <ul>
+ * <li>Cannot enable rate limiter dynamically if it was not enabled at
server startup
+ * <li>Cannot disable rate limiter dynamically if it was enabled at server
startup
Review Comment:
I don’t fully understand the reason behind this limitation. Is there
significant overhead in always having a `RateLimiter` enabled in RocksDB, even
when configured with a very high throughput limit (e.g., 1 TB/s)?
If the performance cost is negligible and especially since we’re enabling
the rate limiter by default, I’d suggest treating a very high value (or
`Long.MAX_VALUE`) as the effective “disabled” state (this also means we don't
rely on `0` to disable)
I think switching between enable and disable is a common and useful
functionality. Otherwise, it's very confusing the documentation says set to `0`
to disable, but it doesn't work.
##########
website/docs/maintenance/operations/updating-configs.md:
##########
@@ -30,13 +34,50 @@ admin.alterClusterConfigs(
admin.alterClusterConfigs(
Collections.singletonList(
new AlterConfig(DATALAKE_FORMAT.key(), "paimon",
AlterConfigOpType.DELETE)));
+
+// Set RocksDB shared rate limiter to 200MB/sec
+admin.alterClusterConfigs(
+ Collections.singletonList(
+ new AlterConfig("kv.shared-rate-limiter-bytes-per-sec",
"209715200", AlterConfigOpType.SET)));
```
The `AlterConfig` class contains three properties:
* `key`: The configuration key to be modified (e.g., `datalake.format`)
* `value`: The configuration value to be set (e.g., `paimon`)
* `opType`: The operation type, either `AlterConfigOpType.SET` or
`AlterConfigOpType.DELETE`
+### Using Flink Stored Procedures
+
+For certain configurations, Fluss provides convenient Flink stored procedures
that can be called directly from Flink SQL.
+
+#### Managing RocksDB Rate Limiter
+
+You can dynamically adjust the shared RocksDB rate limiter for all
TabletServers using Flink stored procedures:
+
+**Set Shared RocksDB Rate Limiter:**
+```sql title="Flink SQL"
+-- Set rate limiter to 200MB/sec (recommended, use named argument, only
supported since Flink 1.19)
+CALL fluss_catalog.sys.set_shared_rocksdb_rate_limiter(rate_limit => '200MB');
Review Comment:
Invoke `USE fluss_catalog` before all the `CALL` statements, this makes the
CALL statements is more reusable as the catalog name maybe different for
different users.
##########
website/docs/maintenance/operations/updating-configs.md:
##########
@@ -30,13 +34,50 @@ admin.alterClusterConfigs(
admin.alterClusterConfigs(
Collections.singletonList(
new AlterConfig(DATALAKE_FORMAT.key(), "paimon",
AlterConfigOpType.DELETE)));
+
+// Set RocksDB shared rate limiter to 200MB/sec
+admin.alterClusterConfigs(
+ Collections.singletonList(
+ new AlterConfig("kv.shared-rate-limiter-bytes-per-sec",
"209715200", AlterConfigOpType.SET)));
Review Comment:
```suggestion
new AlterConfig(KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(),
"200MB", AlterConfigOpType.SET)));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]