This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new d9972404759 [fix][broker]Fix never recovered metadata store bad 
version issue if received a large response from ZK (#24580)
d9972404759 is described below

commit d9972404759fcca132f07f08fd16778143acdbda
Author: fengyubiao <yubiao.f...@streamnative.io>
AuthorDate: Thu Sep 11 18:58:27 2025 +0800

    [fix][broker]Fix never recovered metadata store bad version issue if 
received a large response from ZK (#24580)
    
    (cherry picked from commit 1cb64a921c946bfcebb16890a38b9e748ecb0e62)
---
 .../org/apache/pulsar/broker/PulsarService.java    |   3 +
 .../zookeeper/DefaultMetadataNodeSizeStats.java    | 262 +++++++++
 .../broker/service/BrokerServiceChaosTest.java     |   3 +
 .../CanReconnectZKClientPulsarServiceBaseTest.java |   4 +-
 .../ZKMetadataStoreBatchIOperationTest.java        | 105 ++++
 .../DefaultMetadataNodeSizeStatsSplitPathTest.java | 273 +++++++++
 .../DefaultMetadataNodeSizeStatsTest.java          | 484 ++++++++++++++++
 .../metadata/api/DummyMetadataNodeSizeStats.java   |  43 ++
 .../pulsar/metadata/api/MetadataNodeSizeStats.java |  56 ++
 .../pulsar/metadata/api/MetadataStoreConfig.java   |   5 +
 .../metadata/impl/AbstractMetadataStore.java       |  17 +-
 .../metadata/impl/LocalMemoryMetadataStore.java    |   3 +-
 .../pulsar/metadata/impl/RocksdbMetadataStore.java |   3 +-
 .../pulsar/metadata/impl/ZKMetadataStore.java      |  40 +-
 .../batching/AbstractBatchedMetadataStore.java     |  45 +-
 .../DefaultMetadataStoreBatchStrategy.java         |  68 +++
 .../impl/batching/MetadataStoreBatchStrategy.java  |  33 ++
 .../batching/ZKMetadataStoreBatchStrategy.java     | 131 +++++
 .../metadata/impl/oxia/OxiaMetadataStore.java      |   5 +-
 .../impl/MetadataStoreFactoryImplTest.java         |   2 +-
 .../batching/ZKMetadataStoreBatchStrategyTest.java | 637 +++++++++++++++++++++
 .../java/org/apache/zookeeper/MockZooKeeper.java   |   8 +
 .../org/apache/zookeeper/MockZooKeeperSession.java |   9 +
 23 files changed, 2190 insertions(+), 49 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index fde755403ff..3d93355770f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -196,6 +196,7 @@ import 
org.apache.pulsar.websocket.WebSocketMultiTopicConsumerServlet;
 import org.apache.pulsar.websocket.WebSocketProducerServlet;
 import org.apache.pulsar.websocket.WebSocketReaderServlet;
 import org.apache.pulsar.websocket.WebSocketService;
+import org.apache.pulsar.zookeeper.DefaultMetadataNodeSizeStats;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
 import org.slf4j.Logger;
@@ -427,6 +428,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                         
.metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
                         .synchronizer(synchronizer)
                         .openTelemetry(openTelemetry)
+                        .nodeSizeStats(new DefaultMetadataNodeSizeStats())
                         .build());
     }
 
@@ -1274,6 +1276,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                         .synchronizer(synchronizer)
                         .metadataStoreName(MetadataStoreConfig.METADATA_STORE)
                         .openTelemetry(openTelemetry)
+                        .nodeSizeStats(new DefaultMetadataNodeSizeStats())
                         .build());
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/DefaultMetadataNodeSizeStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/DefaultMetadataNodeSizeStats.java
new file mode 100644
index 00000000000..433d47624f1
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/DefaultMetadataNodeSizeStats.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.zookeeper;
+
+import io.netty.util.concurrent.FastThreadLocal;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataNodeSizeStats;
+
+@Slf4j
+public class DefaultMetadataNodeSizeStats implements MetadataNodeSizeStats {
+
+    public static final int UNSET = -1;
+
+    private static final SplitPathRes MEANINGLESS_SPLIT_PATH_RES = new 
SplitPathRes();
+    private static final FastThreadLocal<SplitPathRes> LOCAL_SPLIT_PATH_RES = 
new FastThreadLocal<SplitPathRes>() {
+        @Override
+        protected SplitPathRes initialValue() {
+            return new SplitPathRes();
+        }
+    };
+    private final AtomicReferenceArray<Integer> maxSizeMapping;
+    private final AtomicReferenceArray<Integer> maxChildrenCountMapping;
+
+   public DefaultMetadataNodeSizeStats() {
+        int pathTypeCount = PathType.values().length;
+        maxSizeMapping = new AtomicReferenceArray<>(pathTypeCount);
+        maxChildrenCountMapping = new AtomicReferenceArray<>(pathTypeCount);
+        for (int i = 0; i < pathTypeCount; i++) {
+            maxSizeMapping.set(i, UNSET);
+            maxChildrenCountMapping.set(i, UNSET);
+        }
+    }
+
+    @Override
+    public void recordPut(String path, byte[] data) {
+        PathType pathType = getPathType(path);
+        if (pathType == PathType.UNKNOWN) {
+            return;
+        }
+        maxSizeMapping.set(pathType.ordinal(), 
Math.max(maxSizeMapping.get(pathType.ordinal()), data.length));
+    }
+
+    @Override
+    public void recordGetRes(String path, GetResult getResult) {
+        PathType pathType = getPathType(path);
+        if (pathType == PathType.UNKNOWN || getResult == null) {
+            return;
+        }
+        maxSizeMapping.set(pathType.ordinal(), 
Math.max(maxSizeMapping.get(pathType.ordinal()),
+                getResult.getValue().length));
+    }
+
+    @Override
+    public void recordGetChildrenRes(String path, List<String> list) {
+        PathType pathType = getPathType(path);
+        if (pathType == PathType.UNKNOWN) {
+            return;
+        }
+        int size = CollectionUtils.isEmpty(list) ? 0 : list.size();
+        maxChildrenCountMapping.set(pathType.ordinal(), 
Math.max(maxChildrenCountMapping.get(pathType.ordinal()),
+                size));
+    }
+
+    @Override
+    public int getMaxSizeOfSameResourceType(String path) {
+        PathType pathType = getPathType(path);
+        if (pathType == PathType.UNKNOWN) {
+            return -1;
+        }
+        return maxSizeMapping.get(pathType.ordinal());
+    }
+
+    @Override
+    public int getMaxChildrenCountOfSameResourceType(String path) {
+        PathType pathType = getPathType(path);
+        if (pathType == PathType.UNKNOWN) {
+            return -1;
+        }
+        return maxChildrenCountMapping.get(pathType.ordinal());
+    }
+
+    private PathType getPathType(String path) {
+        SplitPathRes splitPathRes = splitPath(path);
+        if (splitPathRes.partCount < 2) {
+            return PathType.UNKNOWN;
+        }
+        return switch (splitPathRes.parts[0]) {
+            case "admin" -> getAdminPathType(splitPathRes);
+            case "managed-ledgers" -> getMlPathType(splitPathRes);
+            case "loadbalance" -> getLoadBalancePathType(splitPathRes);
+            case "namespace" -> getBundleOwnerPathType(splitPathRes);
+            case "schemas" -> getSchemaPathType(splitPathRes);
+            default -> PathType.UNKNOWN;
+        };
+    }
+
+    private PathType getAdminPathType(SplitPathRes splitPathRes) {
+        return switch (splitPathRes.parts[1]) {
+            case "clusters" -> PathType.CLUSTER;
+            case "policies" -> switch (splitPathRes.partCount) {
+                case 3 -> PathType.TENANT;
+                case 4 -> PathType.NAMESPACE_POLICIES;
+                default -> PathType.UNKNOWN;
+            };
+            case "local-policies" -> switch (splitPathRes.partCount) {
+                case 4 -> PathType.NAMESPACE_POLICIES;
+                default -> PathType.UNKNOWN;
+            };
+            case "partitioned-topics" -> switch (splitPathRes.partCount) {
+                case 5 -> PathType.PARTITIONED_NAMESPACE;
+                case 6 -> PathType.PARTITIONED_TOPIC;
+                default -> PathType.UNKNOWN;
+            };
+            default -> PathType.UNKNOWN;
+        };
+    }
+
+    private PathType getBundleOwnerPathType(SplitPathRes splitPathRes) {
+       return switch (splitPathRes.partCount) {
+           case 3 -> PathType.BUNDLE_OWNER_NAMESPACE;
+           case 4 -> PathType.BUNDLE_OWNER;
+           default -> PathType.UNKNOWN;
+       };
+    }
+
+    private PathType getSchemaPathType(SplitPathRes splitPathRes) {
+       if (splitPathRes.partCount == 4) {
+           return PathType.TOPIC_SCHEMA;
+       }
+       return PathType.UNKNOWN;
+    }
+
+    private PathType getLoadBalancePathType(SplitPathRes splitPathRes) {
+        return switch (splitPathRes.parts[1]) {
+            case "brokers" -> switch (splitPathRes.partCount) {
+                case 2 -> PathType.BROKERS;
+                case 3 -> PathType.BROKER;
+                default -> PathType.UNKNOWN;
+            };
+            case "bundle-data" -> switch (splitPathRes.partCount) {
+                case 4 -> PathType.BUNDLE_NAMESPACE;
+                case 5 -> PathType.BUNDLE_DATA;
+                default -> PathType.UNKNOWN;
+            };
+            case "broker-time-average" -> switch (splitPathRes.partCount) {
+                case 3 -> PathType.BROKER_TIME_AVERAGE;
+                default -> PathType.UNKNOWN;
+            };
+            case "leader" -> PathType.BROKER_LEADER;
+            default -> PathType.UNKNOWN;
+        };
+    }
+
+    private PathType getMlPathType(SplitPathRes splitPathRes) {
+        return switch (splitPathRes.partCount) {
+            case 4 -> PathType.ML_NAMESPACE;
+            case 5 -> PathType.TOPIC;
+            // v2 subscription and v1 topic.
+            case 6 -> PathType.SUBSCRIPTION;
+            // v1 subscription.
+            case 7 -> PathType.SUBSCRIPTION;
+            default -> PathType.UNKNOWN;
+        };
+    }
+
+    enum PathType {
+        // admin
+        CLUSTER,
+        TENANT,
+        NAMESPACE_POLICIES,
+        LOCAL_POLICIES,
+        // load-balance
+        BROKERS,
+        BROKER,
+        BROKER_LEADER,
+        BUNDLE_NAMESPACE,
+        BUNDLE_DATA,
+        BROKER_TIME_AVERAGE ,
+        BUNDLE_OWNER_NAMESPACE ,
+        BUNDLE_OWNER ,
+        // topic schema
+        TOPIC_SCHEMA,
+        // partitioned topics.
+        PARTITIONED_TOPIC,
+        PARTITIONED_NAMESPACE,
+        // managed ledger.
+        ML_NAMESPACE,
+        TOPIC,
+        SUBSCRIPTION,
+        UNKNOWN;
+    }
+
+    static class SplitPathRes {
+        String[] parts = new String[2];
+        int partCount;
+
+        void reset() {
+            parts[0] = null;
+            parts[1] = null;
+            partCount = 0;
+        }
+    }
+
+    /**
+     * Split the path by the delimiter '/', calculate pieces count and only 
keep the first two parts.
+     */
+    static SplitPathRes splitPath(String path) {
+        if (path == null || path.length() <= 1) {
+            return MEANINGLESS_SPLIT_PATH_RES;
+        }
+        SplitPathRes res = LOCAL_SPLIT_PATH_RES.get();
+        res.reset();
+        String[] parts = res.parts;
+        char delimiter = '/';
+        int length = path.length();
+        int start = 0;
+        int count = 0;
+        for (int i = 0; i < length; i++) {
+            if (path.charAt(i) == delimiter) {
+                // Skip the first and the latest delimiter.
+                if (start == i) {
+                    start = i + 1;
+                    continue;
+                }
+                // Only keep the first two parts.
+                if (count < 2) {
+                    parts[count] = path.substring(start, i);
+                }
+                start = i + 1;
+                count++;
+            }
+        }
+        if (start < length) {
+            if (count < 2) {
+                parts[count] = path.substring(start);
+            }
+            count++;
+        }
+        res.partCount = count;
+        return res;
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java
index 5650fe6e72f..96df607d7e4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java
@@ -97,5 +97,8 @@ public class BrokerServiceChaosTest extends 
CanReconnectZKClientPulsarServiceBas
         PartitionedTopicMetadata partitionedTopicMetadata3 =
                 
pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName2, 
true).get();
         assertEquals(partitionedTopicMetadata3.partitions, 3);
+
+        // cleanup.
+        stopLocalMetadataStoreConnectionTermination();
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
index 41fe580ce2b..d3a780633c0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
@@ -198,7 +198,7 @@ public abstract class 
CanReconnectZKClientPulsarServiceBaseTest extends TestRetr
         log.info("--- OneWayReplicatorTestBase::setup completed ---");
     }
 
-    private void setConfigDefaults(ServiceConfiguration config, String 
clusterName,
+    protected void setConfigDefaults(ServiceConfiguration config, String 
clusterName,
                                    LocalBookkeeperEnsemble bookkeeperEnsemble, 
ZookeeperServerTest brokerConfigZk) {
         config.setClusterName(clusterName);
         config.setAdvertisedAddress("localhost");
@@ -227,8 +227,6 @@ public abstract class 
CanReconnectZKClientPulsarServiceBaseTest extends TestRetr
         markCurrentSetupNumberCleaned();
         log.info("--- Shutting down ---");
 
-        stopLocalMetadataStoreConnectionTermination();
-
         // Stop brokers.
         if (client != null) {
             client.close();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZKMetadataStoreBatchIOperationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZKMetadataStoreBatchIOperationTest.java
new file mode 100644
index 00000000000..bab495bb5f0
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZKMetadataStoreBatchIOperationTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.apache.pulsar.zookeeper.ZookeeperServerTest;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class ZKMetadataStoreBatchIOperationTest extends 
CanReconnectZKClientPulsarServiceBaseTest {
+
+    @Override
+    @BeforeClass(alwaysRun = true, timeOut = 300000)
+    public void setup() throws Exception {
+        System.setProperty("jute.maxbuffer", "16384");
+        super.setup();
+    }
+
+    @Override
+    @AfterClass(alwaysRun = true, timeOut = 300000)
+    public void cleanup() throws Exception {
+        System.clearProperty("jute.maxbuffer");
+        super.cleanup();
+    }
+
+    protected void setConfigDefaults(ServiceConfiguration config, String 
clusterName,
+                                     LocalBookkeeperEnsemble 
bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) {
+        super.setConfigDefaults(config, clusterName, bookkeeperEnsemble, 
brokerConfigZk);
+        config.setMetadataStoreBatchingEnabled(true);
+        config.setMetadataStoreBatchingMaxOperations(1000);
+        config.setMetadataStoreBatchingMaxSizeKb(128);
+        config.setMetadataStoreBatchingMaxDelayMillis(20);
+    }
+
+    @Test(timeOut = 1000 * 60 * 2)
+    public void testReceivedHugeResponse() throws Exception {
+        int maxPacketLen = 
Integer.parseInt(System.getProperty("jute.maxbuffer",
+                ZKClientConfig.CLIENT_MAX_PACKET_LENGTH_DEFAULT + ""));
+        String defaultTp = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        admin.topics().createNonPartitionedTopic(defaultTp);
+
+        int nsCount = (maxPacketLen / 834) + 1;
+        log.info("Try to create {} namespaces", nsCount);
+        String[] nsArray = new String[nsCount];
+        String[] tpArray = new String[nsCount];
+        for (int i = 0; i < nsCount; i++) {
+            String ns = BrokerTestUtil.newUniqueName(defaultTenant + "/ns");
+            nsArray[i] = ns;
+            String tp = BrokerTestUtil.newUniqueName("persistent://" + ns + 
"/tp");
+            tpArray[i] = tp;
+            admin.namespaces().createNamespace(ns);
+            admin.topics().createPartitionedTopic(tp, 16);
+        }
+
+        int len = 
pulsar.getLocalMetadataStore().getChildren("/managed-ledgers/" + nsArray[0] + 
"/persistent").join()
+                .stream().mapToInt(str -> str.length()).sum();
+        log.info("Packet len of list topics of per namespace: {}", len);
+
+        long start = System.currentTimeMillis();
+        CompletableFuture<Void> createSubscriptionFuture = admin.topics()
+                .createSubscriptionAsync(defaultTp, "s1", MessageId.earliest);
+        for (int i = 0; i < nsCount; i++) {
+            pulsar.getLocalMetadataStore().getChildren("/managed-ledgers/" + 
nsArray[i] + "/persistent");
+        }
+        log.info("Send multi ZK operations in {} ms. If it is larger than 20, 
may can not reproduce the issue",
+                (System.currentTimeMillis() - start));
+        
client.newConsumer().topic(defaultTp).subscriptionName("s1").subscribe().close();
+        createSubscriptionFuture.get(10, TimeUnit.SECONDS);
+
+        // cleanup.
+        for (int i = 0; i < nsCount; i++) {
+            admin.topics().deletePartitionedTopic(tpArray[i]);
+        }
+        for (int i = 0; i < nsCount; i++) {
+            admin.namespaces().deleteNamespace(nsArray[i]);
+        }
+        admin.topics().delete(defaultTp);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/DefaultMetadataNodeSizeStatsSplitPathTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/DefaultMetadataNodeSizeStatsSplitPathTest.java
new file mode 100644
index 00000000000..85d98dd7a6a
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/DefaultMetadataNodeSizeStatsSplitPathTest.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.zookeeper;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import org.testng.annotations.Test;
+
+public class DefaultMetadataNodeSizeStatsSplitPathTest {
+
+    @Test
+    public void testSplitPathWithNullInput() {
+        // Test null input returns the meaningless split path result
+        DefaultMetadataNodeSizeStats.SplitPathRes result =
+                DefaultMetadataNodeSizeStats.splitPath(null);
+
+        // Should return the static MEANINGLESS_SPLIT_PATH_RES instance
+        assertEquals(result.partCount, 0);
+        assertNull(result.parts[0]);
+        assertNull(result.parts[1]);
+    }
+
+    @Test
+    public void testSplitPathWithEmptyString() {
+        // Test empty string returns the meaningless split path result
+        DefaultMetadataNodeSizeStats.SplitPathRes result =
+                DefaultMetadataNodeSizeStats.splitPath("");
+
+        assertEquals(result.partCount, 0);
+        assertNull(result.parts[0]);
+        assertNull(result.parts[1]);
+    }
+
+    @Test
+    public void testSplitPathWithSingleCharacter() {
+        // Test single character string returns the meaningless split path 
result
+        DefaultMetadataNodeSizeStats.SplitPathRes result =
+                DefaultMetadataNodeSizeStats.splitPath("/");
+
+        assertEquals(result.partCount, 0);
+        assertNull(result.parts[0]);
+        assertNull(result.parts[1]);
+    }
+
+    @Test
+    public void testSplitPathWithSingleSlash() {
+        // Test single slash returns the meaningless split path result
+        DefaultMetadataNodeSizeStats.SplitPathRes result =
+                DefaultMetadataNodeSizeStats.splitPath("/");
+
+        assertEquals(result.partCount, 0);
+        assertNull(result.parts[0]);
+        assertNull(result.parts[1]);
+    }
+
+    @Test
+    public void testSplitPathWithTwoParts() {
+        // Test path with two parts: /admin/clusters
+        DefaultMetadataNodeSizeStats.SplitPathRes result =
+                DefaultMetadataNodeSizeStats.splitPath("/admin/clusters");
+
+        assertEquals(result.partCount, 2);
+        assertEquals(result.parts[0], "admin");
+        assertEquals(result.parts[1], "clusters");
+    }
+
+    @Test
+    public void testSplitPathWithThreeParts() {
+        // Test path with three parts: /admin/policies/tenant
+        DefaultMetadataNodeSizeStats.SplitPathRes result =
+                
DefaultMetadataNodeSizeStats.splitPath("/admin/policies/tenant");
+
+        assertEquals(result.partCount, 3);
+        assertEquals(result.parts[0], "admin");
+        assertEquals(result.parts[1], "policies");
+        // Third part should not be stored in parts array (only first two are 
kept)
+    }
+
+    @Test
+    public void testSplitPathWithFourParts() {
+        // Test path with four parts: /admin/policies/tenant/namespace
+        DefaultMetadataNodeSizeStats.SplitPathRes result =
+                
DefaultMetadataNodeSizeStats.splitPath("/admin/policies/tenant/namespace");
+
+        assertEquals(result.partCount, 4);
+        assertEquals(result.parts[0], "admin");
+        assertEquals(result.parts[1], "policies");
+        // Only first two parts are stored in the parts array
+    }
+
+    @Test
+    public void testSplitPathWithFiveParts() {
+        // Test path with five parts: /admin/policies/tenant/namespace/topic
+        DefaultMetadataNodeSizeStats.SplitPathRes result =
+                
DefaultMetadataNodeSizeStats.splitPath("/admin/policies/tenant/namespace/topic");
+
+        assertEquals(result.partCount, 5);
+        assertEquals(result.parts[0], "admin");
+        assertEquals(result.parts[1], "policies");
+    }
+
+    @Test
+    public void testSplitPathWithSixParts() {
+        // Test path with six parts: 
/admin/partitioned-topics/persistent/tenant/namespace/topic
+        DefaultMetadataNodeSizeStats.SplitPathRes result = 
DefaultMetadataNodeSizeStats
+                
.splitPath("/admin/partitioned-topics/persistent/tenant/namespace/topic");
+
+        assertEquals(result.partCount, 6);
+        assertEquals(result.parts[0], "admin");
+        assertEquals(result.parts[1], "partitioned-topics");
+    }
+
+    @Test
+    public void testSplitPathWithManagedLedgerPath() {
+        // Test managed ledger path: 
/managed-ledgers/tenant/namespace/persistent/topic
+        DefaultMetadataNodeSizeStats.SplitPathRes result =
+                
DefaultMetadataNodeSizeStats.splitPath("/managed-ledgers/tenant/namespace/persistent/topic");
+
+        assertEquals(result.partCount, 5);
+        assertEquals(result.parts[0], "managed-ledgers");
+        assertEquals(result.parts[1], "tenant");
+    }
+
+    @Test
+    public void testSplitPathWithSubscriptionPath() {
+        // Test subscription path: 
/managed-ledgers/tenant/namespace/persistent/topic/subscription
+        DefaultMetadataNodeSizeStats.SplitPathRes result = 
DefaultMetadataNodeSizeStats
+                
.splitPath("/managed-ledgers/tenant/namespace/persistent/topic/subscription");
+
+        assertEquals(result.partCount, 6);
+        assertEquals(result.parts[0], "managed-ledgers");
+        assertEquals(result.parts[1], "tenant");
+    }
+
+    @Test
+    public void testSplitPathWithTrailingSlash() {
+        // Test path with trailing slash: /admin/clusters/
+        DefaultMetadataNodeSizeStats.SplitPathRes result =
+                DefaultMetadataNodeSizeStats.splitPath("/admin/clusters/");
+
+        assertEquals(result.partCount, 2);
+        assertEquals(result.parts[0], "admin");
+        assertEquals(result.parts[1], "clusters");
+    }
+
+    @Test
+    public void testSplitPathWithMultipleTrailingSlashes() {
+        // Test path with multiple trailing slashes: /admin/clusters///
+        DefaultMetadataNodeSizeStats.SplitPathRes result =
+                DefaultMetadataNodeSizeStats.splitPath("/admin/clusters///");
+
+        assertEquals(result.partCount, 2);
+        assertEquals(result.parts[0], "admin");
+        assertEquals(result.parts[1], "clusters");
+    }
+
+    @Test
+    public void testSplitPathWithConsecutiveSlashes() {
+        // Test path with consecutive slashes: /admin//clusters
+        DefaultMetadataNodeSizeStats.SplitPathRes result =
+                DefaultMetadataNodeSizeStats.splitPath("/admin//clusters");
+
+        assertEquals(result.partCount, 2);
+        assertEquals(result.parts[0], "admin");
+        assertEquals(result.parts[1], "clusters");
+    }
+
+    @Test
+    public void testSplitPathWithLeadingSlashes() {
+        // Test path with multiple leading slashes: ///admin/clusters
+        DefaultMetadataNodeSizeStats.SplitPathRes result =
+                DefaultMetadataNodeSizeStats.splitPath("///admin/clusters");
+
+        assertEquals(result.partCount, 2);
+        assertEquals(result.parts[0], "admin");
+        assertEquals(result.parts[1], "clusters");
+    }
+
+    @Test
+    public void testSplitPathWithSinglePart() {
+        // Test path with single part: /admin
+        DefaultMetadataNodeSizeStats.SplitPathRes result =
+                DefaultMetadataNodeSizeStats.splitPath("/admin");
+
+        assertEquals(result.partCount, 1);
+        assertEquals(result.parts[0], "admin");
+        assertNull(result.parts[1]);
+    }
+
+    @Test
+    public void testSplitPathWithEmptyParts() {
+        // Test path with empty parts: /admin//clusters//tenant
+        DefaultMetadataNodeSizeStats.SplitPathRes result =
+                
DefaultMetadataNodeSizeStats.splitPath("/admin//clusters//tenant");
+
+        assertEquals(result.partCount, 3);
+        assertEquals(result.parts[0], "admin");
+        assertEquals(result.parts[1], "clusters");
+    }
+
+    @Test
+    public void testSplitPathWithSpecialCharacters() {
+        // Test path with special characters: /admin-test/clusters_test
+        DefaultMetadataNodeSizeStats.SplitPathRes result =
+                
DefaultMetadataNodeSizeStats.splitPath("/admin-test/clusters_test");
+
+        assertEquals(result.partCount, 2);
+        assertEquals(result.parts[0], "admin-test");
+        assertEquals(result.parts[1], "clusters_test");
+    }
+
+    @Test
+    public void testSplitPathWithNumbers() {
+        // Test path with numbers: /admin123/clusters456/tenant789
+        DefaultMetadataNodeSizeStats.SplitPathRes result =
+                
DefaultMetadataNodeSizeStats.splitPath("/admin123/clusters456/tenant789");
+
+        assertEquals(result.partCount, 3);
+        assertEquals(result.parts[0], "admin123");
+        assertEquals(result.parts[1], "clusters456");
+    }
+
+    @Test
+    public void testSplitPathWithLongPath() {
+        // Test very long path with many parts
+        String longPath = 
"/part1/part2/part3/part4/part5/part6/part7/part8/part9/part10";
+        DefaultMetadataNodeSizeStats.SplitPathRes result =
+                DefaultMetadataNodeSizeStats.splitPath(longPath);
+
+        assertEquals(result.partCount, 10);
+        assertEquals(result.parts[0], "part1");
+        assertEquals(result.parts[1], "part2");
+        // Only first two parts are stored
+    }
+
+    @Test
+    public void testSplitPathWithShortValidPath() {
+        // Test shortest valid path: /a
+        DefaultMetadataNodeSizeStats.SplitPathRes result =
+                DefaultMetadataNodeSizeStats.splitPath("/a");
+
+        assertEquals(result.partCount, 1);
+        assertEquals(result.parts[0], "a");
+        assertNull(result.parts[1]);
+    }
+
+    @Test
+    public void testSplitPathWithTwoCharacterPath() {
+        // Test two character path: /ab
+        DefaultMetadataNodeSizeStats.SplitPathRes result =
+                DefaultMetadataNodeSizeStats.splitPath("/ab");
+
+        assertEquals(result.partCount, 1);
+        assertEquals(result.parts[0], "ab");
+        assertNull(result.parts[1]);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/DefaultMetadataNodeSizeStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/DefaultMetadataNodeSizeStatsTest.java
new file mode 100644
index 00000000000..bfbed2324d1
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/DefaultMetadataNodeSizeStatsTest.java
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.zookeeper;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.Stat;
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Comprehensive test suite for DefaultMetadataNodeSizeStats class.
+ *
+ * This test covers all the functionality of the DefaultMetadataNodeSizeStats 
class including:
+ * - Path type classification for different Pulsar metadata paths
+ * - Size tracking for put and get operations
+ * - Children count tracking for list operations
+ * - Proper handling of edge cases and null values
+ * - Path splitting functionality
+ */
+public class DefaultMetadataNodeSizeStatsTest {
+
+    private DefaultMetadataNodeSizeStats stats;
+
+    @BeforeMethod
+    public void setUp() {
+        stats = new DefaultMetadataNodeSizeStats();
+    }
+
+    @Test
+    public void testRecordPutAndGetMaxSize() {
+        // Test with cluster path
+        String clusterPath = "/admin/clusters/test-cluster";
+        byte[] smallData = "small".getBytes(StandardCharsets.UTF_8);
+        byte[] largeData = "this is a much larger data 
payload".getBytes(StandardCharsets.UTF_8);
+
+        // Initially should return UNSET (-1)
+        assertEquals(stats.getMaxSizeOfSameResourceType(clusterPath), 
DefaultMetadataNodeSizeStats.UNSET);
+
+        // Record small data first
+        stats.recordPut(clusterPath, smallData);
+        assertEquals(stats.getMaxSizeOfSameResourceType(clusterPath), 
smallData.length);
+
+        // Record larger data - should update the max
+        stats.recordPut(clusterPath, largeData);
+        assertEquals(stats.getMaxSizeOfSameResourceType(clusterPath), 
largeData.length);
+
+        // Record smaller data again - should not change the max
+        stats.recordPut(clusterPath, smallData);
+        assertEquals(stats.getMaxSizeOfSameResourceType(clusterPath), 
largeData.length);
+    }
+
+    @Test
+    public void testRecordGetResAndGetMaxSize() {
+        String tenantPath = "/admin/policies/test-tenant";
+        byte[] data1 = "data1".getBytes(StandardCharsets.UTF_8);
+        byte[] data2 = "much longer data 
payload".getBytes(StandardCharsets.UTF_8);
+
+        // Create mock GetResult objects
+        Stat mockStat = Mockito.mock(Stat.class);
+        GetResult getResult1 = new GetResult(data1, mockStat);
+        GetResult getResult2 = new GetResult(data2, mockStat);
+
+        // Initially should return UNSET (-1)
+        assertEquals(stats.getMaxSizeOfSameResourceType(tenantPath), 
DefaultMetadataNodeSizeStats.UNSET);
+
+        // Record first result
+        stats.recordGetRes(tenantPath, getResult1);
+        assertEquals(stats.getMaxSizeOfSameResourceType(tenantPath), 
data1.length);
+
+        // Record larger result - should update the max
+        stats.recordGetRes(tenantPath, getResult2);
+        assertEquals(stats.getMaxSizeOfSameResourceType(tenantPath), 
data2.length);
+
+        // Record smaller result again - should not change the max
+        stats.recordGetRes(tenantPath, getResult1);
+        assertEquals(stats.getMaxSizeOfSameResourceType(tenantPath), 
data2.length);
+    }
+
+    @Test
+    public void testRecordGetChildrenResAndGetMaxChildrenCount() {
+        String namespacePath = "/admin/policies/test-tenant/test-namespace";
+        List<String> smallList = Arrays.asList("a", "b", "c");
+        List<String> largeList = Arrays.asList("longer-name-1", 
"longer-name-2", "longer-name-3", "longer-name-4");
+
+        // Initially should return UNSET (-1)
+        
assertEquals(stats.getMaxChildrenCountOfSameResourceType(namespacePath), 
DefaultMetadataNodeSizeStats.UNSET);
+
+        // Record small list first - should track the count, not the total 
length
+        stats.recordGetChildrenRes(namespacePath, smallList);
+        
assertEquals(stats.getMaxChildrenCountOfSameResourceType(namespacePath), 
smallList.size());
+
+        // Record larger list - should update the max count
+        stats.recordGetChildrenRes(namespacePath, largeList);
+        
assertEquals(stats.getMaxChildrenCountOfSameResourceType(namespacePath), 
largeList.size());
+
+        // Record smaller list again - should not change the max
+        stats.recordGetChildrenRes(namespacePath, smallList);
+        
assertEquals(stats.getMaxChildrenCountOfSameResourceType(namespacePath), 
largeList.size());
+    }
+
+    @Test
+    public void testUnknownPathsReturnMinusOne() {
+        // Test that unknown paths return -1 (not UNSET from internal storage)
+        String unknownPath = "/some/unknown/path";
+        assertEquals(stats.getMaxSizeOfSameResourceType(unknownPath), -1);
+        assertEquals(stats.getMaxChildrenCountOfSameResourceType(unknownPath), 
-1);
+
+        // Test that recording data for unknown paths is ignored
+        byte[] data = "test-data".getBytes(StandardCharsets.UTF_8);
+        stats.recordPut(unknownPath, data);
+        assertEquals(stats.getMaxSizeOfSameResourceType(unknownPath), -1);
+
+        // Test that recording children for unknown paths is ignored
+        List<String> children = Arrays.asList("child1", "child2");
+        stats.recordGetChildrenRes(unknownPath, children);
+        assertEquals(stats.getMaxChildrenCountOfSameResourceType(unknownPath), 
-1);
+    }
+
+    @Test
+    public void testNullGetResult() {
+        String testPath = "/admin/clusters/test";
+
+        // Recording null GetResult should not affect the stats
+        stats.recordGetRes(testPath, null);
+        assertEquals(stats.getMaxSizeOfSameResourceType(testPath), 
DefaultMetadataNodeSizeStats.UNSET);
+
+        // Record valid data first
+        byte[] data = "test-data".getBytes(StandardCharsets.UTF_8);
+        stats.recordPut(testPath, data);
+        assertEquals(stats.getMaxSizeOfSameResourceType(testPath), 
data.length);
+
+        // Recording null GetResult should not change the existing max
+        stats.recordGetRes(testPath, null);
+        assertEquals(stats.getMaxSizeOfSameResourceType(testPath), 
data.length);
+    }
+
+    @Test
+    public void testAdminPathTypeClassification() {
+        // Test cluster paths
+        String clusterPath = "/admin/clusters/test-cluster";
+        byte[] clusterData = "cluster-data".getBytes(StandardCharsets.UTF_8);
+        stats.recordPut(clusterPath, clusterData);
+        assertEquals(stats.getMaxSizeOfSameResourceType(clusterPath), 
clusterData.length);
+
+        // Test tenant paths (policies with 2 parts)
+        String tenantPath = "/admin/policies/test-tenant";
+        byte[] tenantData = "tenant-data".getBytes(StandardCharsets.UTF_8);
+        stats.recordPut(tenantPath, tenantData);
+        assertEquals(stats.getMaxSizeOfSameResourceType(tenantPath), 
tenantData.length);
+
+        // Test namespace policy paths (policies with 3 parts)
+        String namespacePath = "/admin/policies/test-tenant/test-namespace";
+        byte[] namespaceData = 
"namespace-data".getBytes(StandardCharsets.UTF_8);
+        stats.recordPut(namespacePath, namespaceData);
+        assertEquals(stats.getMaxSizeOfSameResourceType(namespacePath), 
namespaceData.length);
+
+        // Test local policies paths
+        String localPoliciesPath = 
"/admin/local-policies/test-tenant/test-namespace";
+        byte[] localPoliciesData = 
"local-policies-data".getBytes(StandardCharsets.UTF_8);
+        stats.recordPut(localPoliciesPath, localPoliciesData);
+        assertEquals(stats.getMaxSizeOfSameResourceType(localPoliciesPath), 
localPoliciesData.length);
+
+        // Test partitioned namespace paths (5 parts)
+        String partitionedNamespacePath = 
"/admin/partitioned-topics/persistent/test-tenant/test-namespace";
+        byte[] partitionedNamespaceData = 
"partitioned-namespace-data".getBytes(StandardCharsets.UTF_8);
+        stats.recordPut(partitionedNamespacePath, partitionedNamespaceData);
+        
assertEquals(stats.getMaxSizeOfSameResourceType(partitionedNamespacePath), 
partitionedNamespaceData.length);
+
+        // Test partitioned topic paths (6 parts)
+        String partitionedTopicPath = 
"/admin/partitioned-topics/persistent/test-tenant/test-namespace/test-topic";
+        byte[] partitionedTopicData = 
"partitioned-topic-data".getBytes(StandardCharsets.UTF_8);
+        stats.recordPut(partitionedTopicPath, partitionedTopicData);
+        assertEquals(stats.getMaxSizeOfSameResourceType(partitionedTopicPath), 
partitionedTopicData.length);
+
+        // Verify that different path types maintain separate max values
+        assertNotEquals(stats.getMaxSizeOfSameResourceType(clusterPath),
+                       stats.getMaxSizeOfSameResourceType(tenantPath));
+        assertNotEquals(stats.getMaxSizeOfSameResourceType(tenantPath),
+                       stats.getMaxSizeOfSameResourceType(namespacePath));
+    }
+
+    @Test
+    public void testManagedLedgerPathTypes() {
+        // Test ML namespace paths (4 parts)
+        String mlNamespacePath = 
"/managed-ledgers/tenant/namespace/persistent";
+        byte[] mlNamespaceData = 
"ml-namespace-data".getBytes(StandardCharsets.UTF_8);
+        stats.recordPut(mlNamespacePath, mlNamespaceData);
+        assertEquals(stats.getMaxSizeOfSameResourceType(mlNamespacePath), 
mlNamespaceData.length);
+
+        // Test topic paths (5 parts)
+        String topicPath = 
"/managed-ledgers/tenant/namespace/persistent/topic";
+        byte[] topicData = "topic-data".getBytes(StandardCharsets.UTF_8);
+        stats.recordPut(topicPath, topicData);
+        assertEquals(stats.getMaxSizeOfSameResourceType(topicPath), 
topicData.length);
+
+        // Test v2 subscription paths (6 parts)
+        String v2SubscriptionPath = 
"/managed-ledgers/tenant/namespace/persistent/topic/subscription";
+        byte[] v2SubscriptionData = 
"v2-subscription-data".getBytes(StandardCharsets.UTF_8);
+        stats.recordPut(v2SubscriptionPath, v2SubscriptionData);
+        assertEquals(stats.getMaxSizeOfSameResourceType(v2SubscriptionPath), 
v2SubscriptionData.length);
+
+        // Test v1 subscription paths (7 parts)
+        String v1SubscriptionPath = 
"/managed-ledgers/tenant/cluster/namespace/persistent/topic/subscription";
+        byte[] v1SubscriptionData = 
"v1-subscription-data".getBytes(StandardCharsets.UTF_8);
+        stats.recordPut(v1SubscriptionPath, v1SubscriptionData);
+        assertEquals(stats.getMaxSizeOfSameResourceType(v1SubscriptionPath), 
v1SubscriptionData.length);
+
+        // Verify that v2 and v1 subscriptions use the same path type 
(SUBSCRIPTION)
+        assertEquals(stats.getMaxSizeOfSameResourceType(v2SubscriptionPath),
+                    Math.max(v2SubscriptionData.length, 
v1SubscriptionData.length));
+        assertEquals(stats.getMaxSizeOfSameResourceType(v1SubscriptionPath),
+                    Math.max(v2SubscriptionData.length, 
v1SubscriptionData.length));
+
+        // Verify different path types maintain separate max values
+        assertNotEquals(stats.getMaxSizeOfSameResourceType(mlNamespacePath),
+                       stats.getMaxSizeOfSameResourceType(topicPath));
+        assertNotEquals(stats.getMaxSizeOfSameResourceType(topicPath),
+                       stats.getMaxSizeOfSameResourceType(v2SubscriptionPath));
+    }
+
+    @Test
+    public void testLoadBalancePathTypes() {
+        // Test brokers path (2 parts)
+        String brokersPath = "/loadbalance/brokers";
+        byte[] brokersData = "brokers-data".getBytes(StandardCharsets.UTF_8);
+        stats.recordPut(brokersPath, brokersData);
+        assertEquals(stats.getMaxSizeOfSameResourceType(brokersPath), 
brokersData.length);
+
+        // Test broker path (3 parts)
+        String brokerPath = "/loadbalance/brokers/broker1";
+        byte[] brokerData = "broker-data".getBytes(StandardCharsets.UTF_8);
+        stats.recordPut(brokerPath, brokerData);
+        assertEquals(stats.getMaxSizeOfSameResourceType(brokerPath), 
brokerData.length);
+
+        // Test bundle namespace path (4 parts)
+        String bundleNamespacePath = 
"/loadbalance/bundle-data/tenant/namespace";
+        byte[] bundleNamespaceData = 
"bundle-namespace-data".getBytes(StandardCharsets.UTF_8);
+        stats.recordPut(bundleNamespacePath, bundleNamespaceData);
+        assertEquals(stats.getMaxSizeOfSameResourceType(bundleNamespacePath), 
bundleNamespaceData.length);
+
+        // Test bundle data path (5 parts)
+        String bundleDataPath = 
"/loadbalance/bundle-data/tenant/namespace/bundle";
+        byte[] bundleData = "bundle-data".getBytes(StandardCharsets.UTF_8);
+        stats.recordPut(bundleDataPath, bundleData);
+        assertEquals(stats.getMaxSizeOfSameResourceType(bundleDataPath), 
bundleData.length);
+
+        // Test broker time average path (3 parts)
+        String brokerTimeAvgPath = "/loadbalance/broker-time-average/broker1";
+        byte[] brokerTimeAvgData = 
"broker-time-avg-data".getBytes(StandardCharsets.UTF_8);
+        stats.recordPut(brokerTimeAvgPath, brokerTimeAvgData);
+        assertEquals(stats.getMaxSizeOfSameResourceType(brokerTimeAvgPath), 
brokerTimeAvgData.length);
+
+        // Test leader path
+        String leaderPath = "/loadbalance/leader";
+        byte[] leaderData = "leader-data".getBytes(StandardCharsets.UTF_8);
+        stats.recordPut(leaderPath, leaderData);
+        assertEquals(stats.getMaxSizeOfSameResourceType(leaderPath), 
leaderData.length);
+
+        // Verify different path types maintain separate max values
+        assertNotEquals(stats.getMaxSizeOfSameResourceType(brokersPath),
+                       stats.getMaxSizeOfSameResourceType(brokerPath));
+    }
+
+    @Test
+    public void testBundleOwnerPathTypes() {
+        // Test bundle owner namespace path (3 parts)
+        String bundleOwnerNamespacePath = "/namespace/tenant/namespace";
+        byte[] bundleOwnerNamespaceData = 
"bundle-owner-namespace-data".getBytes(StandardCharsets.UTF_8);
+        stats.recordPut(bundleOwnerNamespacePath, bundleOwnerNamespaceData);
+        
assertEquals(stats.getMaxSizeOfSameResourceType(bundleOwnerNamespacePath), 
bundleOwnerNamespaceData.length);
+
+        // Test bundle owner path (4 parts)
+        String bundleOwnerPath = "/namespace/tenant/namespace/bundle";
+        byte[] bundleOwnerData = 
"bundle-owner-data".getBytes(StandardCharsets.UTF_8);
+        stats.recordPut(bundleOwnerPath, bundleOwnerData);
+        assertEquals(stats.getMaxSizeOfSameResourceType(bundleOwnerPath), 
bundleOwnerData.length);
+
+        // Verify different path types maintain separate max values
+        
assertNotEquals(stats.getMaxSizeOfSameResourceType(bundleOwnerNamespacePath),
+                       stats.getMaxSizeOfSameResourceType(bundleOwnerPath));
+    }
+
+    @Test
+    public void testSchemaPathTypes() {
+        // Test topic schema path (4 parts)
+        String schemaPath = "/schemas/tenant/namespace/topic";
+        byte[] schemaData = "schema-data".getBytes(StandardCharsets.UTF_8);
+        stats.recordPut(schemaPath, schemaData);
+        assertEquals(stats.getMaxSizeOfSameResourceType(schemaPath), 
schemaData.length);
+
+        // Test invalid schema path (3 parts) - should be UNKNOWN
+        String invalidSchemaPath = "/schemas/tenant/namespace";
+        byte[] invalidSchemaData = 
"invalid-schema-data".getBytes(StandardCharsets.UTF_8);
+        stats.recordPut(invalidSchemaPath, invalidSchemaData);
+        assertEquals(stats.getMaxSizeOfSameResourceType(invalidSchemaPath), 
-1);
+
+        // Verify they use different path types
+        assertNotEquals(stats.getMaxSizeOfSameResourceType(schemaPath),
+                       stats.getMaxSizeOfSameResourceType(invalidSchemaPath));
+    }
+
+    @Test
+    public void testShortAndUnknownPaths() {
+        // Test paths that are too short to be classified
+        String shortPath1 = "/";
+        String shortPath2 = "/admin";
+
+        byte[] data = "test-data".getBytes(StandardCharsets.UTF_8);
+
+        // Recording data for unknown paths should be ignored
+        stats.recordPut(shortPath1, data);
+        stats.recordPut(shortPath2, data);
+
+        // Both should return -1 since they are UNKNOWN path types
+        assertEquals(stats.getMaxSizeOfSameResourceType(shortPath1), -1);
+        assertEquals(stats.getMaxSizeOfSameResourceType(shortPath2), -1);
+
+        // Test unknown admin paths
+        String unknownAdminPath = "/admin/unknown/path";
+        byte[] unknownData = 
"unknown-admin-data".getBytes(StandardCharsets.UTF_8);
+        stats.recordPut(unknownAdminPath, unknownData);
+
+        // Should return -1 since it's an UNKNOWN path type
+        assertEquals(stats.getMaxSizeOfSameResourceType(unknownAdminPath), -1);
+
+        // Test children recording for unknown paths
+        List<String> children = Arrays.asList("child1", "child2");
+        stats.recordGetChildrenRes(shortPath1, children);
+        assertEquals(stats.getMaxChildrenCountOfSameResourceType(shortPath1), 
-1);
+    }
+
+    @Test
+    public void testEmptyAndZeroSizeData() {
+        String testPath = "/admin/clusters/test";
+
+        // Test with empty data
+        byte[] emptyData = new byte[0];
+        stats.recordPut(testPath, emptyData);
+        assertEquals(stats.getMaxSizeOfSameResourceType(testPath), 0);
+
+        // Test with empty list
+        List<String> emptyList = Collections.emptyList();
+        stats.recordGetChildrenRes(testPath, emptyList);
+        assertEquals(stats.getMaxChildrenCountOfSameResourceType(testPath), 0);
+
+        // Record larger data - should update the max
+        byte[] largerData = "larger-data".getBytes(StandardCharsets.UTF_8);
+        stats.recordPut(testPath, largerData);
+        assertEquals(stats.getMaxSizeOfSameResourceType(testPath), 
largerData.length);
+
+        // Record larger list - should update the max
+        List<String> largerList = Arrays.asList("item1", "item2", "item3");
+        stats.recordGetChildrenRes(testPath, largerList);
+        assertEquals(stats.getMaxChildrenCountOfSameResourceType(testPath), 
largerList.size());
+    }
+
+    @Test
+    public void testMixedOperationsOnSamePath() {
+        String testPath = "/admin/policies/test-tenant";
+
+        // Record via put
+        byte[] putData = "put-data".getBytes(StandardCharsets.UTF_8);
+        stats.recordPut(testPath, putData);
+        assertEquals(stats.getMaxSizeOfSameResourceType(testPath), 
putData.length);
+
+        // Record via get with larger data - should update max
+        byte[] largerData = "much larger get result 
data".getBytes(StandardCharsets.UTF_8);
+        Stat mockStat = Mockito.mock(Stat.class);
+        GetResult getResult = new GetResult(largerData, mockStat);
+        stats.recordGetRes(testPath, getResult);
+        assertEquals(stats.getMaxSizeOfSameResourceType(testPath), 
largerData.length);
+
+        // Record via put with smaller data - should not change max
+        byte[] smallerData = "small".getBytes(StandardCharsets.UTF_8);
+        stats.recordPut(testPath, smallerData);
+        assertEquals(stats.getMaxSizeOfSameResourceType(testPath), 
largerData.length);
+
+        // Test children count operations
+        List<String> smallList = Arrays.asList("a", "b");
+        stats.recordGetChildrenRes(testPath, smallList);
+        assertEquals(stats.getMaxChildrenCountOfSameResourceType(testPath), 
smallList.size());
+
+        List<String> largerList = Arrays.asList("a", "b", "c", "d", "e");
+        stats.recordGetChildrenRes(testPath, largerList);
+        assertEquals(stats.getMaxChildrenCountOfSameResourceType(testPath), 
largerList.size());
+    }
+
+    @Test
+    public void testSplitPathFunctionality() {
+        // Test the static splitPath method indirectly through path type 
classification
+
+        // Test path with trailing slashes
+        String pathWithTrailingSlash = "/admin/clusters/test-cluster/";
+        byte[] data = "test-data".getBytes(StandardCharsets.UTF_8);
+        stats.recordPut(pathWithTrailingSlash, data);
+        
assertEquals(stats.getMaxSizeOfSameResourceType(pathWithTrailingSlash), 
data.length);
+
+        // Should be classified the same as path without trailing slash
+        String pathWithoutTrailingSlash = "/admin/clusters/test-cluster";
+        
assertEquals(stats.getMaxSizeOfSameResourceType(pathWithoutTrailingSlash), 
data.length);
+
+        // Test path with multiple consecutive slashes
+        String pathWithMultipleSlashes = "/admin//clusters//test-cluster";
+        
assertEquals(stats.getMaxSizeOfSameResourceType(pathWithMultipleSlashes), 
data.length);
+    }
+
+    @Test
+    public void testUnknownPathTypesAreIgnored() {
+        // Test that UNKNOWN path types don't affect any statistics
+        String[] unknownPaths = {
+            "/",
+            "/admin",
+            "/unknown/path",
+            "/admin/unknown/path/type",
+            "/managed-ledgers", // too short
+            "/loadbalance", // too short
+            "/schemas/tenant", // too short (needs 4 parts)
+            "/admin/policies/tenant/namespace/extra", // too long for policies
+            "/admin/partitioned-topics/persistent", // too short for 
partitioned topics
+            
"/admin/partitioned-topics/persistent/tenant/namespace/topic/extra" // too long
+        };
+
+        byte[] data = "test-data".getBytes(StandardCharsets.UTF_8);
+        List<String> children = Arrays.asList("child1", "child2", "child3");
+
+        for (String unknownPath : unknownPaths) {
+            // Record operations should be ignored
+            stats.recordPut(unknownPath, data);
+            stats.recordGetChildrenRes(unknownPath, children);
+
+            // Should always return -1
+            assertEquals(stats.getMaxSizeOfSameResourceType(unknownPath), -1,
+                        "Path should return -1: " + unknownPath);
+            
assertEquals(stats.getMaxChildrenCountOfSameResourceType(unknownPath), -1,
+                        "Path should return -1: " + unknownPath);
+        }
+    }
+
+    @Test
+    public void testEmptyAndNullListHandling() {
+        String testPath = "/admin/clusters/test";
+
+        // Test with null list (should be handled gracefully)
+        stats.recordGetChildrenRes(testPath, null);
+        assertEquals(stats.getMaxChildrenCountOfSameResourceType(testPath), 0);
+
+        // Test with empty list
+        List<String> emptyList = Collections.emptyList();
+        stats.recordGetChildrenRes(testPath, emptyList);
+        assertEquals(stats.getMaxChildrenCountOfSameResourceType(testPath), 0);
+
+        // Test with non-empty list - should update the max
+        List<String> nonEmptyList = Arrays.asList("item1", "item2", "item3");
+        stats.recordGetChildrenRes(testPath, nonEmptyList);
+        assertEquals(stats.getMaxChildrenCountOfSameResourceType(testPath), 
nonEmptyList.size());
+
+        // Test with empty list again - should not change the max
+        stats.recordGetChildrenRes(testPath, emptyList);
+        assertEquals(stats.getMaxChildrenCountOfSameResourceType(testPath), 
nonEmptyList.size());
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/DummyMetadataNodeSizeStats.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/DummyMetadataNodeSizeStats.java
new file mode 100644
index 00000000000..ba60c662588
--- /dev/null
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/DummyMetadataNodeSizeStats.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.api;
+
+import java.util.List;
+
+public class DummyMetadataNodeSizeStats implements MetadataNodeSizeStats {
+
+    @Override
+    public void recordPut(String path, byte[] data) {}
+
+    @Override
+    public void recordGetRes(String path, GetResult getResult) {}
+
+    @Override
+    public void recordGetChildrenRes(String path, List<String> list) {}
+
+    @Override
+    public int getMaxSizeOfSameResourceType(String path) {
+        return 1;
+    }
+
+    @Override
+    public int getMaxChildrenCountOfSameResourceType(String path) {
+        return 1;
+    }
+}
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataNodeSizeStats.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataNodeSizeStats.java
new file mode 100644
index 00000000000..92d6084b46c
--- /dev/null
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataNodeSizeStats.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.api;
+
+import java.util.List;
+
+/***
+ * The interface to cache the max payload length of metadata node. It is 
helpful for the following cases:
+ * 1. the limitation of the response of batching query from metadata store. 
For example, the ZK client limits the max
+ *    length of response packet to 1MB by default, if the response packet is 
larger than the limitation, the ZK client
+ *    will throw an error "Packet len {len} is out of range!" and reconnects.
+ * 2. expose the metrics of payload length of metadata node.
+ */
+public interface MetadataNodeSizeStats {
+
+    /**
+     * Record the payload length of put operation.
+     */
+    void recordPut(String path, byte[] data);
+
+    /**
+     * Record the payload length of get result.
+     */
+    void recordGetRes(String path, GetResult getResult);
+
+    /**
+     * Record the payload length of list result.
+     */
+    void recordGetChildrenRes(String path, List<String> list);
+
+    /**
+     * Get the max size of same resource type.
+     */
+    int getMaxSizeOfSameResourceType(String path);
+
+    /**
+     * Get the max children count of same resource type.
+     */
+    int getMaxChildrenCountOfSameResourceType(String path);
+}
\ No newline at end of file
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
index be29f843eea..ef50dc87691 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
@@ -99,4 +99,9 @@ public class MetadataStoreConfig {
      */
     @Builder.Default
     private OpenTelemetry openTelemetry = OpenTelemetry.noop();
+
+    /**
+     * The estimator to estimate the payload length of metadata node, which 
used to limit the batch size requested.
+     */
+    private MetadataNodeSizeStats nodeSizeStats;
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index 506df8b631d..b0e4b43f700 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -51,11 +51,13 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.stats.CacheMetricsCollector;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.DummyMetadataNodeSizeStats;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataCacheConfig;
 import org.apache.pulsar.metadata.api.MetadataEvent;
 import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
+import org.apache.pulsar.metadata.api.MetadataNodeSizeStats;
 import org.apache.pulsar.metadata.api.MetadataSerde;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.Notification;
@@ -89,7 +91,12 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
 
     protected abstract CompletableFuture<Boolean> existsFromStore(String path);
 
-    protected AbstractMetadataStore(String metadataStoreName, OpenTelemetry 
openTelemetry) {
+    protected MetadataNodeSizeStats nodeSizeStats;
+
+    protected AbstractMetadataStore(String metadataStoreName, OpenTelemetry 
openTelemetry,
+                MetadataNodeSizeStats nodeSizeStats) {
+        this.nodeSizeStats = nodeSizeStats == null ? new 
DummyMetadataNodeSizeStats()
+                : nodeSizeStats;
         this.executor = new ScheduledThreadPoolExecutor(1,
                 new DefaultThreadFactory(
                         StringUtils.isNotBlank(metadataStoreName) ? 
metadataStoreName : getClass().getSimpleName()));
@@ -280,6 +287,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
         return storeGet(path)
                 .whenComplete((v, t) -> {
                     if (t != null) {
+                        v.ifPresent(getResult -> 
nodeSizeStats.recordGetRes(path, getResult));
                         
metadataStoreStats.recordGetOpsFailed(System.currentTimeMillis() - start);
                     } else {
                         
metadataStoreStats.recordGetOpsSucceeded(System.currentTimeMillis() - start);
@@ -302,7 +310,11 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
         if (!isValidPath(path)) {
             return FutureUtil.failedFuture(new 
MetadataStoreException.InvalidPathException(path));
         }
-        return childrenCache.get(path);
+        CompletableFuture<List<String>> listFuture = childrenCache.get(path);
+        listFuture.thenAccept((list) -> {
+            nodeSizeStats.recordGetChildrenRes(path, list);
+        });
+        return listFuture;
     }
 
     @Override
@@ -488,6 +500,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
                     NotificationType type = stat.isFirstVersion() ? 
NotificationType.Created
                             : NotificationType.Modified;
                     if (type == NotificationType.Created) {
+                        nodeSizeStats.recordPut(path, data);
                         existsCache.synchronous().invalidate(path);
                         String parent = parent(path);
                         if (parent != null) {
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
index e95f1947740..079cb3130e0 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
@@ -78,7 +78,8 @@ public class LocalMemoryMetadataStore extends 
AbstractMetadataStore implements M
 
     public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig 
metadataStoreConfig)
             throws MetadataStoreException {
-        super(metadataStoreConfig.getMetadataStoreName(), 
metadataStoreConfig.getOpenTelemetry());
+        super(metadataStoreConfig.getMetadataStoreName(), 
metadataStoreConfig.getOpenTelemetry(),
+                metadataStoreConfig.getNodeSizeStats());
         String name = metadataURL.substring(MEMORY_SCHEME_IDENTIFIER.length());
         // Local means a private data set
         // update synchronizer and register sync listener
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
index 752fc7153cf..74bddda7454 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
@@ -209,7 +209,8 @@ public class RocksdbMetadataStore extends 
AbstractMetadataStore {
      */
     private RocksdbMetadataStore(String metadataURL, MetadataStoreConfig 
metadataStoreConfig)
             throws MetadataStoreException {
-        super(metadataStoreConfig.getMetadataStoreName(), 
metadataStoreConfig.getOpenTelemetry());
+        super(metadataStoreConfig.getMetadataStoreName(), 
metadataStoreConfig.getOpenTelemetry(),
+                metadataStoreConfig.getNodeSizeStats());
         this.metadataUrl = metadataURL;
         try {
             RocksDB.loadLibrary();
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index d3065fcaae2..5bf7e2272f0 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -31,7 +31,7 @@ import java.util.stream.Collectors;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.tuple.Triple;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataStore;
@@ -54,6 +54,7 @@ import org.apache.pulsar.metadata.impl.batching.OpDelete;
 import org.apache.pulsar.metadata.impl.batching.OpGet;
 import org.apache.pulsar.metadata.impl.batching.OpGetChildren;
 import org.apache.pulsar.metadata.impl.batching.OpPut;
+import org.apache.pulsar.metadata.impl.batching.ZKMetadataStoreBatchStrategy;
 import org.apache.zookeeper.AddWatchMode;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
@@ -110,6 +111,7 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
                 sessionWatcher = null;
             }
             zkc.addWatch("/", eventWatcher, AddWatchMode.PERSISTENT_RECURSIVE);
+            initBatchStrategy();
         } catch (Throwable t) {
             throw new MetadataStoreException(t);
         }
@@ -144,6 +146,14 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
         this.zkc = zkc;
         this.sessionWatcher = new ZKSessionWatcher(zkc, 
this::receivedSessionEvent);
         zkc.addWatch("/", eventWatcher, AddWatchMode.PERSISTENT_RECURSIVE);
+        initBatchStrategy();
+    }
+
+    private void initBatchStrategy() {
+        ZKMetadataStoreBatchStrategy batchStrategy =
+                new ZKMetadataStoreBatchStrategy(nodeSizeStats, maxOperations, 
maxSize, zkc);
+        this.maxSize = batchStrategy.maxSize();
+        super.metadataStoreBatchStrategy = batchStrategy;
     }
 
     @Override
@@ -204,18 +214,34 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
 
                         // Build the log warning message
                         // summarize the operations by type
+                        final int logThresholdPut = maxSize >> 4;
+                        final int logThresholdGet = maxSize >> 4;
                         String countsByType = ops.stream().collect(
                                         
Collectors.groupingBy(MetadataOp::getType, Collectors.summingInt(op -> 1)))
-                                .entrySet().stream().map(e -> e.getValue() + " 
" + e.getKey().name() + " entries")
+                                .entrySet().stream().map(e -> e.getValue() + " 
" + e.getKey().name())
                                 .collect(Collectors.joining(", "));
-                        List<Pair> opsForLog = ops.stream()
-                                .filter(item -> item.size() > 256 * 1024)
-                                .map(op -> Pair.of(op.getPath(), op.size()))
+                        boolean shouldLimitLogLen = ops.size() > 16;
+                        List<Triple<String, String, Integer>> opsForLog = 
ops.stream()
+                                .filter(item -> {
+                                    if (!shouldLimitLogLen) {
+                                        return true;
+                                    }
+                                    return switch (item.getType()) {
+                                        case PUT -> 
item.asPut().getData().length > logThresholdPut;
+                                        case GET -> nodeSizeStats
+                                                
.getMaxSizeOfSameResourceType(item.getPath()) > logThresholdGet;
+                                        case GET_CHILDREN -> nodeSizeStats
+                                                
.getMaxChildrenCountOfSameResourceType(item.getPath()) > 512;
+                                        default -> false;
+                                    };
+                                })
+                                .map(op -> Triple.of(op.getPath(), 
op.getType().toString(), op.size()))
                                 .collect(Collectors.toList());
                         Long totalSize = 
ops.stream().collect(Collectors.summingLong(MetadataOp::size));
                         log.warn("Connection loss while executing batch 
operation of {} "
-                                + "of total data size of {}. "
-                                + "Retrying individual operations one-by-one. 
ops whose size > 256KB: {}",
+                                + "of total requested data size of {}. "
+                                + "Retrying individual operations one-by-one."
+                                + " ops that maybe large: {}",
                                 countsByType, totalSize, opsForLog);
 
                         // Retry with the individual operations
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
index 865213643c3..a9319a50fec 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.metadata.impl.batching;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
@@ -28,6 +27,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
@@ -50,13 +50,14 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
 
     private final boolean enabled;
     private final int maxDelayMillis;
-    private final int maxOperations;
-    private final int maxSize;
+    protected final int maxOperations;
+    protected int maxSize;
     private MetadataEventSynchronizer synchronizer;
     private final BatchMetadataStoreStats batchMetadataStoreStats;
+    protected MetadataStoreBatchStrategy metadataStoreBatchStrategy;
 
     protected AbstractBatchedMetadataStore(MetadataStoreConfig conf) {
-        super(conf.getMetadataStoreName(), conf.getOpenTelemetry());
+        super(conf.getMetadataStoreName(), conf.getOpenTelemetry(), 
conf.getNodeSizeStats());
 
         this.enabled = conf.isBatchingEnabled();
         this.maxDelayMillis = conf.getBatchingMaxDelayMillis();
@@ -78,6 +79,7 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
         updateMetadataEventSynchronizer(conf.getSynchronizer());
         this.batchMetadataStoreStats =
                 new BatchMetadataStoreStats(metadataStoreName, executor, 
conf.getOpenTelemetry());
+        this.metadataStoreBatchStrategy = new 
DefaultMetadataStoreBatchStrategy(maxOperations, maxSize);
     }
 
     @Override
@@ -100,37 +102,16 @@ public abstract class AbstractBatchedMetadataStore 
extends AbstractMetadataStore
     }
 
     private void flush() {
-        while (!readOps.isEmpty()) {
-            List<MetadataOp> ops = new ArrayList<>();
-            for (int i = 0; i < maxOperations; i++) {
-                MetadataOp op = readOps.poll();
-                if (op == null) {
-                    break;
-                }
-                ops.add(op);
+        List<MetadataOp> currentBatch;
+        if (!readOps.isEmpty()) {
+            while (CollectionUtils.isNotEmpty(currentBatch = 
metadataStoreBatchStrategy.nextBatch(readOps))) {
+                internalBatchOperation(currentBatch);
             }
-            internalBatchOperation(ops);
         }
-
-        while (!writeOps.isEmpty()) {
-            int batchSize = 0;
-
-            List<MetadataOp> ops = new ArrayList<>();
-            for (int i = 0; i < maxOperations; i++) {
-                MetadataOp op = writeOps.peek();
-                if (op == null) {
-                    break;
-                }
-
-                if (i > 0 && (batchSize + op.size()) > maxSize) {
-                    // We have already reached the max size, so flush the 
current batch
-                    break;
-                }
-
-                batchSize += op.size();
-                ops.add(writeOps.poll());
+        if (!writeOps.isEmpty()) {
+            while (CollectionUtils.isNotEmpty(currentBatch = 
metadataStoreBatchStrategy.nextBatch(writeOps))) {
+                internalBatchOperation(currentBatch);
             }
-            internalBatchOperation(ops);
         }
 
         flushInProgress.set(false);
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/DefaultMetadataStoreBatchStrategy.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/DefaultMetadataStoreBatchStrategy.java
new file mode 100644
index 00000000000..766c2551775
--- /dev/null
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/DefaultMetadataStoreBatchStrategy.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.impl.batching;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.jctools.queues.MessagePassingQueue;
+
+/***
+ * The default batching strategy, which only consider how the max operations 
and max size work for the request packet.
+ * And do not care about the response packet.
+ */
+public class DefaultMetadataStoreBatchStrategy implements 
MetadataStoreBatchStrategy {
+
+    private final int maxOperations;
+    private final int maxPutSize;
+
+    public DefaultMetadataStoreBatchStrategy(int maxOperations, int 
maxPutSize) {
+        this.maxOperations = maxOperations;
+        this.maxPutSize = maxPutSize;
+    }
+
+    @Override
+    public List<MetadataOp> nextBatch(MessagePassingQueue<MetadataOp> opsSrc) {
+        int requestSize = 0;
+        List<MetadataOp> ops = new ArrayList<>();
+        while (!opsSrc.isEmpty()) {
+            MetadataOp op = opsSrc.peek();
+            if (op == null) {
+                break;
+            }
+            MetadataOp.Type type = op.getType();
+            switch (type) {
+                case PUT:
+                case DELETE: {
+                    requestSize += op.size();
+                    break;
+                }
+                default: {}
+            }
+            if (!ops.isEmpty() && requestSize > maxPutSize) {
+                // We have already reached the max size, so flush the current 
batch.
+                break;
+            }
+            ops.add(opsSrc.poll());
+            if (ops.size() == maxOperations) {
+                break;
+            }
+        }
+        return ops;
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/MetadataStoreBatchStrategy.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/MetadataStoreBatchStrategy.java
new file mode 100644
index 00000000000..e37d80c27b3
--- /dev/null
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/MetadataStoreBatchStrategy.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.impl.batching;
+
+import java.util.List;
+import org.jctools.queues.MessagePassingQueue;
+
+/***
+ * Used to split ops into multi batch. Because that one batch may can not 
afford all ops.
+ */
+public interface MetadataStoreBatchStrategy {
+
+    /**
+     * Get the next batch of operations to execute.
+     */
+    List<MetadataOp> nextBatch(MessagePassingQueue<MetadataOp> ops);
+}
\ No newline at end of file
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/ZKMetadataStoreBatchStrategy.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/ZKMetadataStoreBatchStrategy.java
new file mode 100644
index 00000000000..ff8cc74d4c1
--- /dev/null
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/ZKMetadataStoreBatchStrategy.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.impl.batching;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pulsar.metadata.api.MetadataNodeSizeStats;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.ZKConfig;
+import org.jctools.queues.MessagePassingQueue;
+
+public class ZKMetadataStoreBatchStrategy implements 
MetadataStoreBatchStrategy {
+
+    // The headers of response command contains the following attributes, 
which cost 88 bytes.
+    // Base attrs: xid(int), zxid(long), err(int), len(int)
+    // Stat attrs: czxid(long), mzxid(long), ctime(long), mtime(long), 
version(int), cversion(int), aversion(int)
+    //             ephemeralOwner(long), dataLength(int), numChildren(int), 
pzxid(long)
+    // By the way, the length of response header may be different between 
different version, since we had use a half
+    // of max size, we can skip to consider the difference.
+    public static final int ZK_RESPONSE_HEADER_LEN = 88;
+    private final int defaultSize;
+
+    private final int maxOperations;
+    private final int maxGetSize;
+    private final int maxPutSize;
+    private final MetadataNodeSizeStats nodeSizeStats;
+
+    public ZKMetadataStoreBatchStrategy(MetadataNodeSizeStats nodeSizeStats, 
int maxOperations, int defaultMaxSize,
+                                        ZooKeeper zkc) {
+        int maxSizeConfigured = zkc.getClientConfig().getInt(
+                ZKConfig.JUTE_MAXBUFFER,
+                ZKClientConfig.CLIENT_MAX_PACKET_LENGTH_DEFAULT);
+        maxSizeConfigured = maxSizeConfigured > 0 ? maxSizeConfigured : 
defaultMaxSize;
+        this.maxOperations = maxOperations;
+        this.maxGetSize = maxSizeConfigured;
+        this.maxPutSize = maxSizeConfigured;
+        this.nodeSizeStats = nodeSizeStats;
+        // If the size of the node can not be calculated by "nodeSizeStats", 
at most package 8 ops into a batch.
+        this.defaultSize = Math.max(maxPutSize >>> 4, 1024);
+    }
+
+    public int maxSize() {
+        return maxPutSize;
+    }
+
+    @Override
+    public List<MetadataOp> nextBatch(MessagePassingQueue<MetadataOp> opsSrc) {
+        int requestSize = 0;
+        int estimatedResponseSize = 0;
+        // Since the response size is estimated, we use half of the max size 
to be safe.
+        int maxGetSize = this.maxGetSize >>> 1;
+        List<MetadataOp> ops = new ArrayList<>();
+        while (!opsSrc.isEmpty()) {
+            MetadataOp op = opsSrc.peek();
+            if (op == null) {
+                break;
+            }
+            MetadataOp.Type type = op.getType();
+            String path = op.getPath();
+            switch (type) {
+                case GET_CHILDREN: {
+                    estimatedResponseSize += ZK_RESPONSE_HEADER_LEN;
+                    int childrenCount = 
nodeSizeStats.getMaxChildrenCountOfSameResourceType(path);
+                    if (childrenCount < 0) {
+                        estimatedResponseSize += defaultSize;
+                        break;
+                    }
+                    // The way that combines list of nodes is as follows
+                    // [4 bytes that indicates the length of the next item] 
[item name].
+                    // So we add 4 bytes for each item.
+                    int size = 
nodeSizeStats.getMaxSizeOfSameResourceType(path);
+                    if (size > 0) {
+                        estimatedResponseSize += (childrenCount * (size + 4));
+                    } else {
+                        estimatedResponseSize += (childrenCount * (defaultSize 
+ 4));
+                    }
+                    break;
+                }
+                case GET: {
+                    estimatedResponseSize += ZK_RESPONSE_HEADER_LEN;
+                    int size = 
nodeSizeStats.getMaxSizeOfSameResourceType(path);
+                    if (size > 0) {
+                        estimatedResponseSize += size;
+                    } else {
+                        estimatedResponseSize += defaultSize;
+                    }
+                    break;
+                }
+                case DELETE:
+                case PUT: {
+                    requestSize += op.size();
+                    // The response of creation contains two attributes: stat 
and path, so we add them into the
+                    // estimation response size.
+                    estimatedResponseSize += ZK_RESPONSE_HEADER_LEN;
+                    estimatedResponseSize += path.length();
+                    break;
+                }
+                default: {
+                    estimatedResponseSize += ZK_RESPONSE_HEADER_LEN;
+                    estimatedResponseSize += path.length();
+                }
+            }
+            if (!ops.isEmpty() && (estimatedResponseSize > maxGetSize || 
requestSize > maxPutSize)) {
+                // We have already reached the max size, so flush the current 
batch.
+                break;
+            }
+            ops.add(opsSrc.poll());
+            if (ops.size() == maxOperations) {
+                break;
+            }
+        }
+        return ops;
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
index c7e1ce24b08..7f0dc6fba10 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
@@ -61,7 +61,7 @@ public class OxiaMetadataStore extends AbstractMetadataStore {
     private Optional<MetadataEventSynchronizer> synchronizer;
 
     public OxiaMetadataStore(AsyncOxiaClient oxia, String identity) {
-        super("oxia-metadata", OpenTelemetry.noop());
+        super("oxia-metadata", OpenTelemetry.noop(), null);
         this.client = oxia;
         this.identity = identity;
         this.synchronizer = Optional.empty();
@@ -74,7 +74,8 @@ public class OxiaMetadataStore extends AbstractMetadataStore {
             MetadataStoreConfig metadataStoreConfig,
             boolean enableSessionWatcher)
             throws Exception {
-        super("oxia-metadata", 
Objects.requireNonNull(metadataStoreConfig).getOpenTelemetry());
+        super("oxia-metadata", 
Objects.requireNonNull(metadataStoreConfig).getOpenTelemetry(),
+                metadataStoreConfig.getNodeSizeStats());
 
         var linger = metadataStoreConfig.getBatchingMaxDelayMillis();
         if (!metadataStoreConfig.isBatchingEnabled()) {
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
index b9abaece9e4..d42b2228346 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
@@ -96,7 +96,7 @@ public class MetadataStoreFactoryImplTest {
 
     public static class MyMetadataStore extends AbstractMetadataStore {
         protected MyMetadataStore() {
-            super("custom", OpenTelemetry.noop());
+            super("custom", OpenTelemetry.noop(), null);
         }
 
         @Override
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/batching/ZKMetadataStoreBatchStrategyTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/batching/ZKMetadataStoreBatchStrategyTest.java
new file mode 100644
index 00000000000..4fe2611a4de
--- /dev/null
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/batching/ZKMetadataStoreBatchStrategyTest.java
@@ -0,0 +1,637 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.impl.batching;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.metadata.api.MetadataNodeSizeStats;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.jctools.queues.MpscUnboundedArrayQueue;
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Comprehensive test suite for ZKMetadataStoreBatchStrategy class.
+ * This test covers:
+ * - Constructor and configuration handling
+ * - Batch size calculations for different operation types
+ * - Size limit enforcement for requests and responses
+ * - Operation count limits
+ * - Edge cases and boundary conditions
+ */
+public class ZKMetadataStoreBatchStrategyTest {
+
+    private ZKMetadataStoreBatchStrategy strategy;
+    private MetadataNodeSizeStats mockNodeSizeStats;
+    private ZooKeeper mockZooKeeper;
+    private ZKClientConfig mockClientConfig;
+    private MpscUnboundedArrayQueue<MetadataOp> operationQueue;
+
+    private static final int DEFAULT_MAX_OPERATIONS = 100;
+    private static final int DEFAULT_MAX_SIZE = 1024 * 1024; // 1MB
+    private static final String TEST_PATH = "/test/path";
+
+    @BeforeMethod
+    public void setUp() {
+        mockNodeSizeStats = Mockito.mock(MetadataNodeSizeStats.class);
+        mockZooKeeper = Mockito.mock(ZooKeeper.class);
+        mockClientConfig = Mockito.mock(ZKClientConfig.class);
+        operationQueue = new MpscUnboundedArrayQueue<>(1000);
+        // Setup default mock behavior
+        
Mockito.when(mockZooKeeper.getClientConfig()).thenReturn(mockClientConfig);
+        Mockito.when(mockClientConfig.getInt(Mockito.anyString(), 
Mockito.anyInt()))
+                .thenReturn(DEFAULT_MAX_SIZE);
+        // Setup default node size stats
+        
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString()))
+                .thenReturn(100);
+        
Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(Mockito.anyString()))
+                .thenReturn(10);
+        strategy = new ZKMetadataStoreBatchStrategy(mockNodeSizeStats, 
DEFAULT_MAX_OPERATIONS,
+                DEFAULT_MAX_SIZE, mockZooKeeper);
+    }
+
+    @Test
+    public void testConstructorWithDefaultMaxSize() {
+        // Test constructor uses default max size when ZK config returns 0 or 
negative
+        Mockito.when(mockClientConfig.getInt(Mockito.anyString(), 
Mockito.anyInt()))
+                .thenReturn(0);
+        ZKMetadataStoreBatchStrategy strategyWithDefault = new 
ZKMetadataStoreBatchStrategy(
+                mockNodeSizeStats, DEFAULT_MAX_OPERATIONS, DEFAULT_MAX_SIZE, 
mockZooKeeper);
+        assertEquals(strategyWithDefault.maxSize(), DEFAULT_MAX_SIZE);
+    }
+
+    @Test
+    public void testConstructorWithConfiguredMaxSize() {
+        int configuredSize = 2 * 1024 * 1024; // 2MB
+        Mockito.when(mockClientConfig.getInt(Mockito.anyString(), 
Mockito.anyInt()))
+                .thenReturn(configuredSize);
+        ZKMetadataStoreBatchStrategy strategyWithConfig = new 
ZKMetadataStoreBatchStrategy(
+                mockNodeSizeStats, DEFAULT_MAX_OPERATIONS, DEFAULT_MAX_SIZE, 
mockZooKeeper);
+        assertEquals(strategyWithConfig.maxSize(), configuredSize);
+    }
+
+    @Test
+    public void testMaxSizeMethod() {
+        assertEquals(strategy.maxSize(), DEFAULT_MAX_SIZE);
+    }
+
+    @Test
+    public void testEmptyQueue() {
+        List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+        assertTrue(batch.isEmpty());
+    }
+
+    @Test
+    public void testSingleGetOperation() {
+        MetadataOp getOp = createMockGetOperation(TEST_PATH);
+        operationQueue.offer(getOp);
+        List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+        assertEquals(batch.size(), 1);
+        assertEquals(batch.get(0), getOp);
+        assertTrue(operationQueue.isEmpty());
+    }
+
+    @Test
+    public void testSingleGetChildrenOperation() {
+        MetadataOp getChildrenOp = createMockGetChildrenOperation(TEST_PATH);
+        operationQueue.offer(getChildrenOp);
+        List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+        assertEquals(batch.size(), 1);
+        assertEquals(batch.get(0), getChildrenOp);
+        assertTrue(operationQueue.isEmpty());
+    }
+
+    @Test
+    public void testSinglePutOperation() {
+        MetadataOp putOp = createMockPutOperation(TEST_PATH, 100);
+        operationQueue.offer(putOp);
+        List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+        assertEquals(batch.size(), 1);
+        assertEquals(batch.get(0), putOp);
+        assertTrue(operationQueue.isEmpty());
+    }
+
+    @Test
+    public void testSingleDeleteOperation() {
+        MetadataOp deleteOp = createMockDeleteOperation(TEST_PATH);
+        operationQueue.offer(deleteOp);
+        List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+        assertEquals(batch.size(), 1);
+        assertEquals(batch.get(0), deleteOp);
+        assertTrue(operationQueue.isEmpty());
+    }
+
+    @Test
+    public void testMaxOperationsLimit() {
+        // Add more operations than the max limit
+        for (int i = 0; i < DEFAULT_MAX_OPERATIONS + 10; i++) {
+            operationQueue.offer(createMockGetOperation("/test/path/" + i));
+        }
+        List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+        assertEquals(batch.size(), DEFAULT_MAX_OPERATIONS);
+        assertEquals(operationQueue.size(), 10); // Remaining operations
+    }
+
+    @Test
+    public void testRequestSizeLimit() {
+        // Create large PUT operations that exceed the request size limit
+        int largeSize = DEFAULT_MAX_SIZE / 2 + 1000; // Larger than half max 
size
+        operationQueue.offer(createMockPutOperation("/test/path1", largeSize));
+        operationQueue.offer(createMockPutOperation("/test/path2", largeSize));
+        operationQueue.offer(createMockPutOperation("/test/path3", 100)); // 
Small operation
+        List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+        // Should only include the first operation due to size limit
+        assertEquals(batch.size(), 1);
+        assertEquals(operationQueue.size(), 2); // Two operations remaining
+    }
+
+    // Helper methods for creating mock operations
+    private MetadataOp createMockGetOperation(String path) {
+        MetadataOp op = Mockito.mock(MetadataOp.class);
+        Mockito.when(op.getType()).thenReturn(MetadataOp.Type.GET);
+        Mockito.when(op.getPath()).thenReturn(path);
+        Mockito.when(op.size()).thenReturn(0);
+        Mockito.when(op.getFuture()).thenReturn(new CompletableFuture<>());
+        return op;
+    }
+
+    private MetadataOp createMockGetChildrenOperation(String path) {
+        MetadataOp op = Mockito.mock(MetadataOp.class);
+        Mockito.when(op.getType()).thenReturn(MetadataOp.Type.GET_CHILDREN);
+        Mockito.when(op.getPath()).thenReturn(path);
+        Mockito.when(op.size()).thenReturn(0);
+        Mockito.when(op.getFuture()).thenReturn(new CompletableFuture<>());
+        return op;
+    }
+
+    private MetadataOp createMockPutOperation(String path, int size) {
+        MetadataOp op = Mockito.mock(MetadataOp.class);
+        Mockito.when(op.getType()).thenReturn(MetadataOp.Type.PUT);
+        Mockito.when(op.getPath()).thenReturn(path);
+        Mockito.when(op.size()).thenReturn(size);
+        Mockito.when(op.getFuture()).thenReturn(new CompletableFuture<>());
+        return op;
+    }
+
+    private MetadataOp createMockDeleteOperation(String path) {
+        MetadataOp op = Mockito.mock(MetadataOp.class);
+        Mockito.when(op.getType()).thenReturn(MetadataOp.Type.DELETE);
+        Mockito.when(op.getPath()).thenReturn(path);
+        Mockito.when(op.size()).thenReturn(path.length());
+        Mockito.when(op.getFuture()).thenReturn(new CompletableFuture<>());
+        return op;
+    }
+
+    @Test
+    public void testResponseSizeLimitForGetOperations() {
+        // Setup large response size for node stats
+        
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString()))
+                .thenReturn(DEFAULT_MAX_SIZE / 5); // Large response size
+        // Add multiple GET operations
+        for (int i = 0; i < 10; i++) {
+            operationQueue.offer(createMockGetOperation("/test/path/" + i));
+        }
+        List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+        // Should limit based on estimated response size (half of max size)
+        assertEquals(batch.size(), 2);
+    }
+
+    @Test
+    public void testResponseSizeLimitForGetChildrenOperations() {
+        // Setup large response size for children operations
+        
Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(Mockito.anyString()))
+                .thenReturn(1000); // Many children
+        
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString()))
+                .thenReturn(100); // Size per child
+        // Add multiple GET_CHILDREN operations
+        for (int i = 0; i < 10; i++) {
+            operationQueue.offer(createMockGetChildrenOperation("/test/path/" 
+ i));
+        }
+        List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+        // Should limit based on estimated response size.
+        // DEFAULT_MAX_SIZE is 1024 * 1024, the half value is 1024 * 512.
+        // Per children response is 1000 * 100.
+        // So the result should be 5;
+        assertEquals(batch.size(), 5);
+    }
+
+    @Test
+    public void testMixedOperationTypes() {
+        operationQueue.offer(createMockGetOperation("/test/get"));
+        operationQueue.offer(createMockPutOperation("/test/put", 100));
+        operationQueue.offer(createMockDeleteOperation("/test/delete"));
+        operationQueue.offer(createMockGetChildrenOperation("/test/children"));
+        List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+        assertEquals(batch.size(), 4);
+        assertTrue(operationQueue.isEmpty());
+    }
+
+    @Test
+    public void testNullOperationInQueue() {
+        // Test the case where peek() returns null (queue becomes empty during 
processing)
+        MetadataOp mockOp = createMockGetOperation("/test/path1");
+        @SuppressWarnings("unchecked")
+        MpscUnboundedArrayQueue<MetadataOp> mockQueue = 
Mockito.mock(MpscUnboundedArrayQueue.class);
+        Mockito.when(mockQueue.isEmpty()).thenReturn(false, false, true);
+        Mockito.when(mockQueue.peek()).thenReturn(mockOp, (MetadataOp) null);
+        Mockito.when(mockQueue.poll()).thenReturn(mockOp);
+        List<MetadataOp> batch = strategy.nextBatch(mockQueue);
+        assertEquals(batch.size(), 1);
+    }
+
+    @Test
+    public void testZKResponseHeaderCalculation() {
+        // Test that ZK_RESPONSE_HEADER_LEN is correctly used in calculations
+        assertEquals(ZKMetadataStoreBatchStrategy.ZK_RESPONSE_HEADER_LEN, 88);
+        // Add a GET operation and verify header is included in size 
calculation
+        operationQueue.offer(createMockGetOperation(TEST_PATH));
+        // The actual size calculation is internal, but we can verify the 
operation is processed
+        List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+        assertEquals(batch.size(), 1);
+    }
+
+    @Test
+    public void testDefaultSizeFallbackForGetOperations() {
+        // Test that defaultSize is used when node size stats return negative 
values
+        // defaultSize = Math.max(maxPutSize >>> 4, 1024) = Math.max(1MB >>> 
4, 1024) = Math.max(65536, 1024) = 65536
+        
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString()))
+                .thenReturn(-1); // Negative value should trigger defaultSize 
fallback
+        operationQueue.offer(createMockGetOperation("/test/path1"));
+        operationQueue.offer(createMockGetOperation("/test/path2"));
+        List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+        // Should process operations using defaultSize for size estimation
+        assertEquals(batch.size(), 2);
+        assertTrue(operationQueue.isEmpty());
+    }
+
+    @Test
+    public void testDefaultSizeFallbackForGetChildrenOperations() {
+        // Test defaultSize fallback for GET_CHILDREN when childrenCount < 0
+        
Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(Mockito.anyString()))
+                .thenReturn(-1); // Negative value should trigger defaultSize 
fallback
+        operationQueue.offer(createMockGetChildrenOperation("/test/path1"));
+        operationQueue.offer(createMockGetChildrenOperation("/test/path2"));
+        List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+        // Should process operations using defaultSize for size estimation
+        assertEquals(batch.size(), 2);
+        assertTrue(operationQueue.isEmpty());
+    }
+
+    @Test
+    public void 
testDefaultSizeFallbackForGetChildrenWithValidCountButInvalidSize() {
+        // Test defaultSize fallback when childrenCount >= 0 but size <= 0
+        
Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(Mockito.anyString()))
+                .thenReturn(5); // Valid children count
+        
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString()))
+                .thenReturn(-1); // Invalid size should trigger defaultSize 
fallback
+        operationQueue.offer(createMockGetChildrenOperation("/test/path"));
+        List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+        assertEquals(batch.size(), 1);
+        assertTrue(operationQueue.isEmpty());
+    }
+
+    @Test
+    public void testDefaultOperationType() {
+        // The implementation doesn't handle null types, so let's test with a 
valid operation
+        // that would use the default case in the switch statement
+        MetadataOp putOp = createMockPutOperation(TEST_PATH, 50);
+        operationQueue.offer(putOp);
+        List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+        assertEquals(batch.size(), 1);
+        assertEquals(batch.get(0), putOp);
+    }
+
+    @Test
+    public void testLargePathNames() {
+        // Test with very long path names
+        String longPath = "/very/long/path/name/that/exceeds/normal/length/"
+                + "and/continues/for/a/very/long/time/to/test/path/handling/"
+                + "in/the/batch/strategy/implementation";
+        operationQueue.offer(createMockPutOperation(longPath, 100));
+        operationQueue.offer(createMockDeleteOperation(longPath));
+        List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+        assertEquals(batch.size(), 2);
+        assertTrue(operationQueue.isEmpty());
+    }
+
+    @Test
+    public void testZeroSizeOperations() {
+        // Test operations with zero size
+        operationQueue.offer(createMockPutOperation(TEST_PATH, 0));
+        operationQueue.offer(createMockGetOperation(TEST_PATH));
+        List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+        assertEquals(batch.size(), 2);
+        assertTrue(operationQueue.isEmpty());
+    }
+
+    @Test
+    public void testNodeSizeStatsIntegration() {
+        String path1 = "/test/path1";
+        String path2 = "/test/path2";
+
+        // Setup different sizes for different paths
+        
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(path1)).thenReturn(1000);
+        
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(path2)).thenReturn(2000);
+        
Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(path1)).thenReturn(50);
+        
Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(path2)).thenReturn(100);
+
+        operationQueue.offer(createMockGetOperation(path1));
+        operationQueue.offer(createMockGetChildrenOperation(path2));
+
+        List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+
+        assertEquals(batch.size(), 2);
+
+        // Verify that the node size stats were consulted
+        Mockito.verify(mockNodeSizeStats).getMaxSizeOfSameResourceType(path1);
+        
Mockito.verify(mockNodeSizeStats).getMaxChildrenCountOfSameResourceType(path2);
+    }
+
+    @Test
+    public void testBatchSizeCalculationAccuracy() {
+        // Test that batch size calculation prevents oversized batches
+
+        // Setup a scenario where the second operation would exceed the limit
+        
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString()))
+                .thenReturn(DEFAULT_MAX_SIZE / 3); // Each operation uses 1/3 
of max size
+
+        operationQueue.offer(createMockGetOperation("/test/path1"));
+        operationQueue.offer(createMockGetOperation("/test/path2"));
+        operationQueue.offer(createMockGetOperation("/test/path3")); // This 
should not fit
+
+        List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+
+        // Should include first two operations but not the third
+        assertTrue(batch.size() <= 2);
+        assertTrue(batch.size() >= 1);
+    }
+
+    @Test
+    public void testMixedValidAndInvalidNodeSizeStats() {
+        // Test mixing operations with valid and invalid node size stats
+        String validPath = "/valid/path";
+        String invalidPath = "/invalid/path";
+
+        // Setup different responses for different paths
+        
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(validPath)).thenReturn(500);
+        
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(invalidPath)).thenReturn(-1);
+        
Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(validPath)).thenReturn(10);
+        
Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(invalidPath)).thenReturn(-1);
+
+        operationQueue.offer(createMockGetOperation(validPath));
+        operationQueue.offer(createMockGetOperation(invalidPath)); // Should 
use defaultSize
+        operationQueue.offer(createMockGetChildrenOperation(validPath));
+        operationQueue.offer(createMockGetChildrenOperation(invalidPath)); // 
Should use defaultSize
+
+        List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+
+        assertEquals(batch.size(), 4);
+        assertTrue(operationQueue.isEmpty());
+    }
+
+    @Test
+    public void testZeroSizeFromNodeStats() {
+        // Test handling of zero size from node stats (should use defaultSize)
+        
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString()))
+                .thenReturn(0); // Zero size should trigger defaultSize 
fallback
+        
Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(Mockito.anyString()))
+                .thenReturn(5); // Valid children count
+
+        operationQueue.offer(createMockGetOperation("/test/get"));
+        operationQueue.offer(createMockGetChildrenOperation("/test/children"));
+
+        List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+
+        assertEquals(batch.size(), 2);
+        assertTrue(operationQueue.isEmpty());
+    }
+
+    @Test
+    public void testDynamicDefaultSizeCalculation() {
+        // Test that defaultSize is calculated dynamically based on maxPutSize
+        // defaultSize = Math.max(maxPutSize >>> 4, 1024)
+
+        // Test with default configuration (1MB)
+        // defaultSize = Math.max(1048576 >>> 4, 1024) = Math.max(65536, 1024) 
= 65536
+        assertEquals(strategy.maxSize(), DEFAULT_MAX_SIZE); // Verify 
maxPutSize
+
+        // Test with smaller maxPutSize
+        int smallMaxSize = 8192; // 8KB
+        Mockito.when(mockClientConfig.getInt(Mockito.anyString(), 
Mockito.anyInt()))
+                .thenReturn(smallMaxSize);
+
+        ZKMetadataStoreBatchStrategy smallStrategy = new 
ZKMetadataStoreBatchStrategy(
+                mockNodeSizeStats, DEFAULT_MAX_OPERATIONS, DEFAULT_MAX_SIZE, 
mockZooKeeper);
+
+        assertEquals(smallStrategy.maxSize(), smallMaxSize);
+
+        // For small maxPutSize, defaultSize should be Math.max(8192 >>> 4, 
1024) = Math.max(512, 1024) = 1024
+        // We can't directly access defaultSize, but we can test its behavior
+        
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString()))
+                .thenReturn(-1); // Force defaultSize usage
+
+        MpscUnboundedArrayQueue<MetadataOp> smallQueue = new 
MpscUnboundedArrayQueue<>(1000);
+        smallQueue.offer(createMockGetOperation("/test/path"));
+
+        List<MetadataOp> batch = smallStrategy.nextBatch(smallQueue);
+        assertEquals(batch.size(), 1);
+    }
+
+    @Test
+    public void testDynamicDefaultSizeWithLargeMaxPutSize() {
+        // Test with very large maxPutSize
+        int largeMaxSize = 16 * 1024 * 1024; // 16MB
+        Mockito.when(mockClientConfig.getInt(Mockito.anyString(), 
Mockito.anyInt()))
+                .thenReturn(largeMaxSize);
+
+        ZKMetadataStoreBatchStrategy largeStrategy = new 
ZKMetadataStoreBatchStrategy(
+                mockNodeSizeStats, DEFAULT_MAX_OPERATIONS, DEFAULT_MAX_SIZE, 
mockZooKeeper);
+
+        assertEquals(largeStrategy.maxSize(), largeMaxSize);
+
+        // For large maxPutSize, defaultSize should be Math.max(16MB >>> 4, 
1024) = Math.max(1MB, 1024) = 1MB
+        // This allows for larger batches when size stats are unavailable
+        
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString()))
+                .thenReturn(-1); // Force defaultSize usage
+
+        MpscUnboundedArrayQueue<MetadataOp> largeQueue = new 
MpscUnboundedArrayQueue<>(1000);
+
+        // Add multiple operations - with larger defaultSize, more should fit
+        for (int i = 0; i < 20; i++) {
+            largeQueue.offer(createMockGetOperation("/test/path/" + i));
+        }
+
+        List<MetadataOp> batch = largeStrategy.nextBatch(largeQueue);
+        assertTrue(batch.size() > 0);
+        assertTrue(batch.size() <= 20);
+    }
+
+    @Test
+    public void testDefaultSizeBehavior() {
+        // Test that the defaultSize field is used correctly for size 
calculations
+        // We can't directly access the private field, but we can test its 
behavior
+
+        // When node stats return negative values, defaultSize should be used
+        
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString()))
+                .thenReturn(-1);
+
+        // Add many GET operations to test if defaultSize is being used for 
calculations
+        for (int i = 0; i < 100; i++) {
+            operationQueue.offer(createMockGetOperation("/test/path/" + i));
+        }
+
+        List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+
+        // Should be able to process some operations (exact number depends on 
defaultSize calculation)
+        assertTrue(batch.size() >= 7);
+        assertTrue(batch.size() <= 8);
+    }
+
+    @Test
+    public void testGetChildrenWithZeroChildrenCount() {
+        // Test GET_CHILDREN with zero children count (valid case)
+        
Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(Mockito.anyString()))
+                .thenReturn(0); // Zero children is valid
+        
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString()))
+                .thenReturn(100); // Valid size
+
+        
operationQueue.offer(createMockGetChildrenOperation("/empty/directory"));
+
+        List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+
+        assertEquals(batch.size(), 1);
+        assertTrue(operationQueue.isEmpty());
+    }
+
+    @Test
+    public void testEnhancedSizeCalculationLogic() {
+        // Test the enhanced size calculation logic with various scenarios
+
+        // Scenario 1: Valid stats
+        String path1 = "/valid/stats";
+        
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(path1)).thenReturn(200);
+        
Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(path1)).thenReturn(5);
+
+        // Scenario 2: Invalid children count, valid size
+        String path2 = "/invalid/children";
+        
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(path2)).thenReturn(300);
+        
Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(path2)).thenReturn(-1);
+
+        // Scenario 3: Valid children count, invalid size
+        String path3 = "/invalid/size";
+        
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(path3)).thenReturn(-1);
+        
Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(path3)).thenReturn(3);
+
+        operationQueue.offer(createMockGetOperation(path1));
+        operationQueue.offer(createMockGetChildrenOperation(path1));
+        operationQueue.offer(createMockGetChildrenOperation(path2)); // Should 
use defaultSize
+        operationQueue.offer(createMockGetChildrenOperation(path3)); // Should 
use defaultSize for size
+        operationQueue.offer(createMockGetOperation(path2));
+        operationQueue.offer(createMockGetOperation(path3)); // Should use 
defaultSize
+
+        List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+
+        // All operations should be processed
+        assertEquals(batch.size(), 6);
+        assertTrue(operationQueue.isEmpty());
+    }
+
+    @Test
+    public void testDefaultSizeCalculationFormula() {
+        // Test the specific formula: defaultSize = Math.max(maxPutSize >>> 4, 
1024)
+
+        // Test case 1: maxPutSize = 1MB (default), defaultSize should be 
Math.max(65536, 1024) = 65536
+        int maxSize1 = 1024 * 1024; // 1MB
+        Mockito.when(mockClientConfig.getInt(Mockito.anyString(), 
Mockito.anyInt()))
+                .thenReturn(maxSize1);
+
+        ZKMetadataStoreBatchStrategy strategy1 = new 
ZKMetadataStoreBatchStrategy(
+                mockNodeSizeStats, DEFAULT_MAX_OPERATIONS, DEFAULT_MAX_SIZE, 
mockZooKeeper);
+
+        // Test case 2: maxPutSize = 8KB, defaultSize should be Math.max(512, 
1024) = 1024
+        int maxSize2 = 8 * 1024; // 8KB
+        Mockito.when(mockClientConfig.getInt(Mockito.anyString(), 
Mockito.anyInt()))
+                .thenReturn(maxSize2);
+
+        ZKMetadataStoreBatchStrategy strategy2 = new 
ZKMetadataStoreBatchStrategy(
+                mockNodeSizeStats, DEFAULT_MAX_OPERATIONS, DEFAULT_MAX_SIZE, 
mockZooKeeper);
+
+        // Test case 3: maxPutSize = 32MB, defaultSize should be Math.max(2MB, 
1024) = 2MB
+        int maxSize3 = 32 * 1024 * 1024; // 32MB
+        Mockito.when(mockClientConfig.getInt(Mockito.anyString(), 
Mockito.anyInt()))
+                .thenReturn(maxSize3);
+
+        ZKMetadataStoreBatchStrategy strategy3 = new 
ZKMetadataStoreBatchStrategy(
+                mockNodeSizeStats, DEFAULT_MAX_OPERATIONS, DEFAULT_MAX_SIZE, 
mockZooKeeper);
+
+        // Verify maxSize() returns the configured values
+        assertEquals(strategy1.maxSize(), maxSize1);
+        assertEquals(strategy2.maxSize(), maxSize2);
+        assertEquals(strategy3.maxSize(), maxSize3);
+
+        // Test behavior with invalid node stats to verify defaultSize usage
+        
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString()))
+                .thenReturn(-1);
+
+        // All strategies should be able to process operations using their 
respective defaultSize
+        MpscUnboundedArrayQueue<MetadataOp> queue1 = new 
MpscUnboundedArrayQueue<>(10);
+        MpscUnboundedArrayQueue<MetadataOp> queue2 = new 
MpscUnboundedArrayQueue<>(10);
+        MpscUnboundedArrayQueue<MetadataOp> queue3 = new 
MpscUnboundedArrayQueue<>(10);
+
+        queue1.offer(createMockGetOperation("/test1"));
+        queue2.offer(createMockGetOperation("/test2"));
+        queue3.offer(createMockGetOperation("/test3"));
+
+        List<MetadataOp> batch1 = strategy1.nextBatch(queue1);
+        List<MetadataOp> batch2 = strategy2.nextBatch(queue2);
+        List<MetadataOp> batch3 = strategy3.nextBatch(queue3);
+
+        assertEquals(batch1.size(), 1);
+        assertEquals(batch2.size(), 1);
+        assertEquals(batch3.size(), 1);
+    }
+
+    @Test
+    public void testDefaultSizeMinimumValue() {
+        // Test that defaultSize never goes below 1024 bytes
+        int verySmallMaxSize = 1024; // 1KB
+        Mockito.when(mockClientConfig.getInt(Mockito.anyString(), 
Mockito.anyInt()))
+                .thenReturn(verySmallMaxSize);
+
+        ZKMetadataStoreBatchStrategy smallStrategy = new 
ZKMetadataStoreBatchStrategy(
+                mockNodeSizeStats, DEFAULT_MAX_OPERATIONS, DEFAULT_MAX_SIZE, 
mockZooKeeper);
+
+        // defaultSize should be Math.max(1024 >>> 4, 1024) = Math.max(64, 
1024) = 1024
+        assertEquals(smallStrategy.maxSize(), verySmallMaxSize);
+
+        // Test that operations can still be processed even with minimum 
defaultSize
+        
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString()))
+                .thenReturn(-1);
+
+        MpscUnboundedArrayQueue<MetadataOp> smallQueue = new 
MpscUnboundedArrayQueue<>(10);
+        smallQueue.offer(createMockGetOperation("/test/small"));
+
+        List<MetadataOp> batch = smallStrategy.nextBatch(smallQueue);
+        assertEquals(batch.size(), 1);
+    }
+}
diff --git a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java 
b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
index 2682f038df2..c8f37e9b3fc 100644
--- a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
+++ b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
@@ -53,6 +53,7 @@ import org.apache.zookeeper.AsyncCallback.StringCallback;
 import org.apache.zookeeper.AsyncCallback.VoidCallback;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.client.ZKClientConfig;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.proto.DeleteRequest;
@@ -138,6 +139,7 @@ public class MockZooKeeper extends ZooKeeper {
     private int referenceCount;
     private List<AutoCloseable> closeables;
     private int sessionTimeout;
+    private ZKClientConfig zKClientConfig = new ZKClientConfig();
 
     //see details of Objenesis caching - http://objenesis.org/details.html
     //see supported jvms - 
https://github.com/easymock/objenesis/blob/master/SupportedJVMs.md
@@ -190,6 +192,7 @@ public class MockZooKeeper extends ZooKeeper {
         zk.sequentialIdGenerator = new AtomicLong();
         zk.closeables = new ArrayList<>();
         zk.sessionTimeout = 30_000;
+        zk.zKClientConfig = new ZKClientConfig();
         return zk;
     }
 
@@ -236,6 +239,11 @@ public class MockZooKeeper extends ZooKeeper {
         return runInExecutorReturningValue(() -> internalCreate(path, data, 
createMode));
     }
 
+    @Override
+    public ZKClientConfig getClientConfig() {
+        return zKClientConfig;
+    }
+
     private <T> T runInExecutorReturningValue(Callable<T> task)
             throws InterruptedException, KeeperException {
         if (isStopped()) {
diff --git 
a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java 
b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java
index 766f70979aa..0da88c03c2e 100644
--- a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java
+++ b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java
@@ -27,6 +27,7 @@ import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
 import org.apache.zookeeper.AsyncCallback.DataCallback;
 import org.apache.zookeeper.AsyncCallback.StatCallback;
 import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.client.ZKClientConfig;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 import org.objenesis.Objenesis;
@@ -50,6 +51,8 @@ public class MockZooKeeperSession extends ZooKeeper {
 
     private int sessionTimeout = -1;
 
+    private ZKClientConfig zkClientConfig = new ZKClientConfig();
+
     public static MockZooKeeperSession newInstance(MockZooKeeper 
mockZooKeeper) {
         return newInstance(mockZooKeeper, true);
     }
@@ -61,6 +64,7 @@ public class MockZooKeeperSession extends ZooKeeper {
         mockZooKeeperSession.mockZooKeeper = mockZooKeeper;
         mockZooKeeperSession.sessionId = sessionIdGenerator.getAndIncrement();
         mockZooKeeperSession.closeMockZooKeeperOnClose = 
closeMockZooKeeperOnClose;
+        mockZooKeeperSession.zkClientConfig = new ZKClientConfig();
         if (closeMockZooKeeperOnClose) {
             mockZooKeeper.increaseRefCount();
         }
@@ -74,6 +78,11 @@ public class MockZooKeeperSession extends ZooKeeper {
         assert false;
     }
 
+    @Override
+    public ZKClientConfig getClientConfig() {
+        return zkClientConfig;
+    }
+
     @Override
     public int getSessionTimeout() {
         if (sessionTimeout > 0) {

Reply via email to