This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 23042f3bc [zk] Allow to set jute.maxbuffer for zk client (#2048)
23042f3bc is described below
commit 23042f3bc61291768b2d5a0f884f2998dbd99776
Author: yuxia Luo <[email protected]>
AuthorDate: Fri Nov 28 19:38:14 2025 +0800
[zk] Allow to set jute.maxbuffer for zk client (#2048)
---
.../src/main/java/org/apache/fluss/config/ConfigOptions.java | 10 ++++++++++
.../main/java/org/apache/fluss/server/zk/ZooKeeperUtils.java | 11 ++++++++++-
2 files changed, 20 insertions(+), 1 deletion(-)
diff --git
a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
index 875e875d5..81984015c 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
@@ -572,6 +572,16 @@ public class ConfigOptions {
+ "This allows each ZooKeeper client
instance to load its own configuration file, "
+ "instead of relying on shared JVM-level
environment settings. "
+ "This enables fine-grained control over
ZooKeeper client behavior.");
+
+ public static final ConfigOption<Integer> ZOOKEEPER_MAX_BUFFER_SIZE =
+ key("zookeeper.client.max-buffer-size")
+ .intType()
+ .defaultValue(100 * 1024 * 1024) // 100MB
+ .withDescription(
+ "The maximum buffer size (in bytes) for ZooKeeper
client. "
+ + "This corresponds to the jute.maxbuffer
property. "
+ + "Default is 100MB to match the RPC frame
length limit.");
+
// ------------------------------------------------------------------------
// ConfigOptions for Log
// ------------------------------------------------------------------------
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperUtils.java
b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperUtils.java
index c74ff03ad..61336ef8f 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperUtils.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperUtils.java
@@ -40,6 +40,7 @@ import java.util.Arrays;
import java.util.Optional;
import java.util.stream.Collectors;
+import static
org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.common.ZKConfig.JUTE_MAXBUFFER;
import static org.apache.fluss.utils.Preconditions.checkNotNull;
/** Class containing helper functions to interact with ZooKeeper. */
@@ -104,11 +105,12 @@ public class ZooKeeperUtils {
new SessionConnectionStateErrorPolicy());
}
+ ZKClientConfig zkClientConfig;
Optional<String> configPath =
configuration.getOptional(ConfigOptions.ZOOKEEPER_CONFIG_PATH);
if (configPath.isPresent()) {
try {
- ZKClientConfig zkClientConfig = new
ZKClientConfig(configPath.get());
+ zkClientConfig = new ZKClientConfig(configPath.get());
curatorFrameworkBuilder.zkClientConfig(zkClientConfig);
} catch (QuorumPeerConfig.ConfigException e) {
LOG.warn("Fail to load zookeeper client config from path {}",
configPath.get(), e);
@@ -118,7 +120,14 @@ public class ZooKeeperUtils {
configPath.get()),
e);
}
+ } else {
+ zkClientConfig = new ZKClientConfig();
}
+ // set jute.max buffer
+ zkClientConfig.setProperty(
+ JUTE_MAXBUFFER,
+
String.valueOf(configuration.getInt(ConfigOptions.ZOOKEEPER_MAX_BUFFER_SIZE)));
+
return new ZooKeeperClient(
startZookeeperClient(curatorFrameworkBuilder,
fatalErrorHandler), configuration);
}