This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.1 by this push: new d9972404759 [fix][broker]Fix never recovered metadata store bad version issue if received a large response from ZK (#24580) d9972404759 is described below commit d9972404759fcca132f07f08fd16778143acdbda Author: fengyubiao <yubiao.f...@streamnative.io> AuthorDate: Thu Sep 11 18:58:27 2025 +0800 [fix][broker]Fix never recovered metadata store bad version issue if received a large response from ZK (#24580) (cherry picked from commit 1cb64a921c946bfcebb16890a38b9e748ecb0e62) --- .../org/apache/pulsar/broker/PulsarService.java | 3 + .../zookeeper/DefaultMetadataNodeSizeStats.java | 262 +++++++++ .../broker/service/BrokerServiceChaosTest.java | 3 + .../CanReconnectZKClientPulsarServiceBaseTest.java | 4 +- .../ZKMetadataStoreBatchIOperationTest.java | 105 ++++ .../DefaultMetadataNodeSizeStatsSplitPathTest.java | 273 +++++++++ .../DefaultMetadataNodeSizeStatsTest.java | 484 ++++++++++++++++ .../metadata/api/DummyMetadataNodeSizeStats.java | 43 ++ .../pulsar/metadata/api/MetadataNodeSizeStats.java | 56 ++ .../pulsar/metadata/api/MetadataStoreConfig.java | 5 + .../metadata/impl/AbstractMetadataStore.java | 17 +- .../metadata/impl/LocalMemoryMetadataStore.java | 3 +- .../pulsar/metadata/impl/RocksdbMetadataStore.java | 3 +- .../pulsar/metadata/impl/ZKMetadataStore.java | 40 +- .../batching/AbstractBatchedMetadataStore.java | 45 +- .../DefaultMetadataStoreBatchStrategy.java | 68 +++ .../impl/batching/MetadataStoreBatchStrategy.java | 33 ++ .../batching/ZKMetadataStoreBatchStrategy.java | 131 +++++ .../metadata/impl/oxia/OxiaMetadataStore.java | 5 +- .../impl/MetadataStoreFactoryImplTest.java | 2 +- .../batching/ZKMetadataStoreBatchStrategyTest.java | 637 +++++++++++++++++++++ .../java/org/apache/zookeeper/MockZooKeeper.java | 8 + .../org/apache/zookeeper/MockZooKeeperSession.java | 9 + 23 files changed, 2190 insertions(+), 49 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index fde755403ff..3d93355770f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -196,6 +196,7 @@ import org.apache.pulsar.websocket.WebSocketMultiTopicConsumerServlet; import org.apache.pulsar.websocket.WebSocketProducerServlet; import org.apache.pulsar.websocket.WebSocketReaderServlet; import org.apache.pulsar.websocket.WebSocketService; +import org.apache.pulsar.zookeeper.DefaultMetadataNodeSizeStats; import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.websocket.servlet.WebSocketServlet; import org.slf4j.Logger; @@ -427,6 +428,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { .metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE) .synchronizer(synchronizer) .openTelemetry(openTelemetry) + .nodeSizeStats(new DefaultMetadataNodeSizeStats()) .build()); } @@ -1274,6 +1276,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { .synchronizer(synchronizer) .metadataStoreName(MetadataStoreConfig.METADATA_STORE) .openTelemetry(openTelemetry) + .nodeSizeStats(new DefaultMetadataNodeSizeStats()) .build()); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/DefaultMetadataNodeSizeStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/DefaultMetadataNodeSizeStats.java new file mode 100644 index 00000000000..433d47624f1 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/DefaultMetadataNodeSizeStats.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.zookeeper; + +import io.netty.util.concurrent.FastThreadLocal; +import java.util.List; +import java.util.concurrent.atomic.AtomicReferenceArray; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.pulsar.metadata.api.GetResult; +import org.apache.pulsar.metadata.api.MetadataNodeSizeStats; + +@Slf4j +public class DefaultMetadataNodeSizeStats implements MetadataNodeSizeStats { + + public static final int UNSET = -1; + + private static final SplitPathRes MEANINGLESS_SPLIT_PATH_RES = new SplitPathRes(); + private static final FastThreadLocal<SplitPathRes> LOCAL_SPLIT_PATH_RES = new FastThreadLocal<SplitPathRes>() { + @Override + protected SplitPathRes initialValue() { + return new SplitPathRes(); + } + }; + private final AtomicReferenceArray<Integer> maxSizeMapping; + private final AtomicReferenceArray<Integer> maxChildrenCountMapping; + + public DefaultMetadataNodeSizeStats() { + int pathTypeCount = PathType.values().length; + maxSizeMapping = new AtomicReferenceArray<>(pathTypeCount); + maxChildrenCountMapping = new AtomicReferenceArray<>(pathTypeCount); + for (int i = 0; i < pathTypeCount; i++) { + maxSizeMapping.set(i, UNSET); + maxChildrenCountMapping.set(i, UNSET); + } + } + + @Override + public void recordPut(String path, byte[] data) { + PathType pathType = getPathType(path); + if (pathType == PathType.UNKNOWN) { + return; + } + maxSizeMapping.set(pathType.ordinal(), Math.max(maxSizeMapping.get(pathType.ordinal()), data.length)); + } + + @Override + public void recordGetRes(String path, GetResult getResult) { + PathType pathType = getPathType(path); + if (pathType == PathType.UNKNOWN || getResult == null) { + return; + } + maxSizeMapping.set(pathType.ordinal(), Math.max(maxSizeMapping.get(pathType.ordinal()), + getResult.getValue().length)); + } + + @Override + public void recordGetChildrenRes(String path, List<String> list) { + PathType pathType = getPathType(path); + if (pathType == PathType.UNKNOWN) { + return; + } + int size = CollectionUtils.isEmpty(list) ? 0 : list.size(); + maxChildrenCountMapping.set(pathType.ordinal(), Math.max(maxChildrenCountMapping.get(pathType.ordinal()), + size)); + } + + @Override + public int getMaxSizeOfSameResourceType(String path) { + PathType pathType = getPathType(path); + if (pathType == PathType.UNKNOWN) { + return -1; + } + return maxSizeMapping.get(pathType.ordinal()); + } + + @Override + public int getMaxChildrenCountOfSameResourceType(String path) { + PathType pathType = getPathType(path); + if (pathType == PathType.UNKNOWN) { + return -1; + } + return maxChildrenCountMapping.get(pathType.ordinal()); + } + + private PathType getPathType(String path) { + SplitPathRes splitPathRes = splitPath(path); + if (splitPathRes.partCount < 2) { + return PathType.UNKNOWN; + } + return switch (splitPathRes.parts[0]) { + case "admin" -> getAdminPathType(splitPathRes); + case "managed-ledgers" -> getMlPathType(splitPathRes); + case "loadbalance" -> getLoadBalancePathType(splitPathRes); + case "namespace" -> getBundleOwnerPathType(splitPathRes); + case "schemas" -> getSchemaPathType(splitPathRes); + default -> PathType.UNKNOWN; + }; + } + + private PathType getAdminPathType(SplitPathRes splitPathRes) { + return switch (splitPathRes.parts[1]) { + case "clusters" -> PathType.CLUSTER; + case "policies" -> switch (splitPathRes.partCount) { + case 3 -> PathType.TENANT; + case 4 -> PathType.NAMESPACE_POLICIES; + default -> PathType.UNKNOWN; + }; + case "local-policies" -> switch (splitPathRes.partCount) { + case 4 -> PathType.NAMESPACE_POLICIES; + default -> PathType.UNKNOWN; + }; + case "partitioned-topics" -> switch (splitPathRes.partCount) { + case 5 -> PathType.PARTITIONED_NAMESPACE; + case 6 -> PathType.PARTITIONED_TOPIC; + default -> PathType.UNKNOWN; + }; + default -> PathType.UNKNOWN; + }; + } + + private PathType getBundleOwnerPathType(SplitPathRes splitPathRes) { + return switch (splitPathRes.partCount) { + case 3 -> PathType.BUNDLE_OWNER_NAMESPACE; + case 4 -> PathType.BUNDLE_OWNER; + default -> PathType.UNKNOWN; + }; + } + + private PathType getSchemaPathType(SplitPathRes splitPathRes) { + if (splitPathRes.partCount == 4) { + return PathType.TOPIC_SCHEMA; + } + return PathType.UNKNOWN; + } + + private PathType getLoadBalancePathType(SplitPathRes splitPathRes) { + return switch (splitPathRes.parts[1]) { + case "brokers" -> switch (splitPathRes.partCount) { + case 2 -> PathType.BROKERS; + case 3 -> PathType.BROKER; + default -> PathType.UNKNOWN; + }; + case "bundle-data" -> switch (splitPathRes.partCount) { + case 4 -> PathType.BUNDLE_NAMESPACE; + case 5 -> PathType.BUNDLE_DATA; + default -> PathType.UNKNOWN; + }; + case "broker-time-average" -> switch (splitPathRes.partCount) { + case 3 -> PathType.BROKER_TIME_AVERAGE; + default -> PathType.UNKNOWN; + }; + case "leader" -> PathType.BROKER_LEADER; + default -> PathType.UNKNOWN; + }; + } + + private PathType getMlPathType(SplitPathRes splitPathRes) { + return switch (splitPathRes.partCount) { + case 4 -> PathType.ML_NAMESPACE; + case 5 -> PathType.TOPIC; + // v2 subscription and v1 topic. + case 6 -> PathType.SUBSCRIPTION; + // v1 subscription. + case 7 -> PathType.SUBSCRIPTION; + default -> PathType.UNKNOWN; + }; + } + + enum PathType { + // admin + CLUSTER, + TENANT, + NAMESPACE_POLICIES, + LOCAL_POLICIES, + // load-balance + BROKERS, + BROKER, + BROKER_LEADER, + BUNDLE_NAMESPACE, + BUNDLE_DATA, + BROKER_TIME_AVERAGE , + BUNDLE_OWNER_NAMESPACE , + BUNDLE_OWNER , + // topic schema + TOPIC_SCHEMA, + // partitioned topics. + PARTITIONED_TOPIC, + PARTITIONED_NAMESPACE, + // managed ledger. + ML_NAMESPACE, + TOPIC, + SUBSCRIPTION, + UNKNOWN; + } + + static class SplitPathRes { + String[] parts = new String[2]; + int partCount; + + void reset() { + parts[0] = null; + parts[1] = null; + partCount = 0; + } + } + + /** + * Split the path by the delimiter '/', calculate pieces count and only keep the first two parts. + */ + static SplitPathRes splitPath(String path) { + if (path == null || path.length() <= 1) { + return MEANINGLESS_SPLIT_PATH_RES; + } + SplitPathRes res = LOCAL_SPLIT_PATH_RES.get(); + res.reset(); + String[] parts = res.parts; + char delimiter = '/'; + int length = path.length(); + int start = 0; + int count = 0; + for (int i = 0; i < length; i++) { + if (path.charAt(i) == delimiter) { + // Skip the first and the latest delimiter. + if (start == i) { + start = i + 1; + continue; + } + // Only keep the first two parts. + if (count < 2) { + parts[count] = path.substring(start, i); + } + start = i + 1; + count++; + } + } + if (start < length) { + if (count < 2) { + parts[count] = path.substring(start); + } + count++; + } + res.partCount = count; + return res; + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java index 5650fe6e72f..96df607d7e4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java @@ -97,5 +97,8 @@ public class BrokerServiceChaosTest extends CanReconnectZKClientPulsarServiceBas PartitionedTopicMetadata partitionedTopicMetadata3 = pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName2, true).get(); assertEquals(partitionedTopicMetadata3.partitions, 3); + + // cleanup. + stopLocalMetadataStoreConnectionTermination(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java index 41fe580ce2b..d3a780633c0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java @@ -198,7 +198,7 @@ public abstract class CanReconnectZKClientPulsarServiceBaseTest extends TestRetr log.info("--- OneWayReplicatorTestBase::setup completed ---"); } - private void setConfigDefaults(ServiceConfiguration config, String clusterName, + protected void setConfigDefaults(ServiceConfiguration config, String clusterName, LocalBookkeeperEnsemble bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) { config.setClusterName(clusterName); config.setAdvertisedAddress("localhost"); @@ -227,8 +227,6 @@ public abstract class CanReconnectZKClientPulsarServiceBaseTest extends TestRetr markCurrentSetupNumberCleaned(); log.info("--- Shutting down ---"); - stopLocalMetadataStoreConnectionTermination(); - // Stop brokers. if (client != null) { client.close(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZKMetadataStoreBatchIOperationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZKMetadataStoreBatchIOperationTest.java new file mode 100644 index 00000000000..bab495bb5f0 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZKMetadataStoreBatchIOperationTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.apache.pulsar.zookeeper.ZookeeperServerTest; +import org.apache.zookeeper.client.ZKClientConfig; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class ZKMetadataStoreBatchIOperationTest extends CanReconnectZKClientPulsarServiceBaseTest { + + @Override + @BeforeClass(alwaysRun = true, timeOut = 300000) + public void setup() throws Exception { + System.setProperty("jute.maxbuffer", "16384"); + super.setup(); + } + + @Override + @AfterClass(alwaysRun = true, timeOut = 300000) + public void cleanup() throws Exception { + System.clearProperty("jute.maxbuffer"); + super.cleanup(); + } + + protected void setConfigDefaults(ServiceConfiguration config, String clusterName, + LocalBookkeeperEnsemble bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) { + super.setConfigDefaults(config, clusterName, bookkeeperEnsemble, brokerConfigZk); + config.setMetadataStoreBatchingEnabled(true); + config.setMetadataStoreBatchingMaxOperations(1000); + config.setMetadataStoreBatchingMaxSizeKb(128); + config.setMetadataStoreBatchingMaxDelayMillis(20); + } + + @Test(timeOut = 1000 * 60 * 2) + public void testReceivedHugeResponse() throws Exception { + int maxPacketLen = Integer.parseInt(System.getProperty("jute.maxbuffer", + ZKClientConfig.CLIENT_MAX_PACKET_LENGTH_DEFAULT + "")); + String defaultTp = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + admin.topics().createNonPartitionedTopic(defaultTp); + + int nsCount = (maxPacketLen / 834) + 1; + log.info("Try to create {} namespaces", nsCount); + String[] nsArray = new String[nsCount]; + String[] tpArray = new String[nsCount]; + for (int i = 0; i < nsCount; i++) { + String ns = BrokerTestUtil.newUniqueName(defaultTenant + "/ns"); + nsArray[i] = ns; + String tp = BrokerTestUtil.newUniqueName("persistent://" + ns + "/tp"); + tpArray[i] = tp; + admin.namespaces().createNamespace(ns); + admin.topics().createPartitionedTopic(tp, 16); + } + + int len = pulsar.getLocalMetadataStore().getChildren("/managed-ledgers/" + nsArray[0] + "/persistent").join() + .stream().mapToInt(str -> str.length()).sum(); + log.info("Packet len of list topics of per namespace: {}", len); + + long start = System.currentTimeMillis(); + CompletableFuture<Void> createSubscriptionFuture = admin.topics() + .createSubscriptionAsync(defaultTp, "s1", MessageId.earliest); + for (int i = 0; i < nsCount; i++) { + pulsar.getLocalMetadataStore().getChildren("/managed-ledgers/" + nsArray[i] + "/persistent"); + } + log.info("Send multi ZK operations in {} ms. If it is larger than 20, may can not reproduce the issue", + (System.currentTimeMillis() - start)); + client.newConsumer().topic(defaultTp).subscriptionName("s1").subscribe().close(); + createSubscriptionFuture.get(10, TimeUnit.SECONDS); + + // cleanup. + for (int i = 0; i < nsCount; i++) { + admin.topics().deletePartitionedTopic(tpArray[i]); + } + for (int i = 0; i < nsCount; i++) { + admin.namespaces().deleteNamespace(nsArray[i]); + } + admin.topics().delete(defaultTp); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/DefaultMetadataNodeSizeStatsSplitPathTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/DefaultMetadataNodeSizeStatsSplitPathTest.java new file mode 100644 index 00000000000..85d98dd7a6a --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/DefaultMetadataNodeSizeStatsSplitPathTest.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.zookeeper; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; +import org.testng.annotations.Test; + +public class DefaultMetadataNodeSizeStatsSplitPathTest { + + @Test + public void testSplitPathWithNullInput() { + // Test null input returns the meaningless split path result + DefaultMetadataNodeSizeStats.SplitPathRes result = + DefaultMetadataNodeSizeStats.splitPath(null); + + // Should return the static MEANINGLESS_SPLIT_PATH_RES instance + assertEquals(result.partCount, 0); + assertNull(result.parts[0]); + assertNull(result.parts[1]); + } + + @Test + public void testSplitPathWithEmptyString() { + // Test empty string returns the meaningless split path result + DefaultMetadataNodeSizeStats.SplitPathRes result = + DefaultMetadataNodeSizeStats.splitPath(""); + + assertEquals(result.partCount, 0); + assertNull(result.parts[0]); + assertNull(result.parts[1]); + } + + @Test + public void testSplitPathWithSingleCharacter() { + // Test single character string returns the meaningless split path result + DefaultMetadataNodeSizeStats.SplitPathRes result = + DefaultMetadataNodeSizeStats.splitPath("/"); + + assertEquals(result.partCount, 0); + assertNull(result.parts[0]); + assertNull(result.parts[1]); + } + + @Test + public void testSplitPathWithSingleSlash() { + // Test single slash returns the meaningless split path result + DefaultMetadataNodeSizeStats.SplitPathRes result = + DefaultMetadataNodeSizeStats.splitPath("/"); + + assertEquals(result.partCount, 0); + assertNull(result.parts[0]); + assertNull(result.parts[1]); + } + + @Test + public void testSplitPathWithTwoParts() { + // Test path with two parts: /admin/clusters + DefaultMetadataNodeSizeStats.SplitPathRes result = + DefaultMetadataNodeSizeStats.splitPath("/admin/clusters"); + + assertEquals(result.partCount, 2); + assertEquals(result.parts[0], "admin"); + assertEquals(result.parts[1], "clusters"); + } + + @Test + public void testSplitPathWithThreeParts() { + // Test path with three parts: /admin/policies/tenant + DefaultMetadataNodeSizeStats.SplitPathRes result = + DefaultMetadataNodeSizeStats.splitPath("/admin/policies/tenant"); + + assertEquals(result.partCount, 3); + assertEquals(result.parts[0], "admin"); + assertEquals(result.parts[1], "policies"); + // Third part should not be stored in parts array (only first two are kept) + } + + @Test + public void testSplitPathWithFourParts() { + // Test path with four parts: /admin/policies/tenant/namespace + DefaultMetadataNodeSizeStats.SplitPathRes result = + DefaultMetadataNodeSizeStats.splitPath("/admin/policies/tenant/namespace"); + + assertEquals(result.partCount, 4); + assertEquals(result.parts[0], "admin"); + assertEquals(result.parts[1], "policies"); + // Only first two parts are stored in the parts array + } + + @Test + public void testSplitPathWithFiveParts() { + // Test path with five parts: /admin/policies/tenant/namespace/topic + DefaultMetadataNodeSizeStats.SplitPathRes result = + DefaultMetadataNodeSizeStats.splitPath("/admin/policies/tenant/namespace/topic"); + + assertEquals(result.partCount, 5); + assertEquals(result.parts[0], "admin"); + assertEquals(result.parts[1], "policies"); + } + + @Test + public void testSplitPathWithSixParts() { + // Test path with six parts: /admin/partitioned-topics/persistent/tenant/namespace/topic + DefaultMetadataNodeSizeStats.SplitPathRes result = DefaultMetadataNodeSizeStats + .splitPath("/admin/partitioned-topics/persistent/tenant/namespace/topic"); + + assertEquals(result.partCount, 6); + assertEquals(result.parts[0], "admin"); + assertEquals(result.parts[1], "partitioned-topics"); + } + + @Test + public void testSplitPathWithManagedLedgerPath() { + // Test managed ledger path: /managed-ledgers/tenant/namespace/persistent/topic + DefaultMetadataNodeSizeStats.SplitPathRes result = + DefaultMetadataNodeSizeStats.splitPath("/managed-ledgers/tenant/namespace/persistent/topic"); + + assertEquals(result.partCount, 5); + assertEquals(result.parts[0], "managed-ledgers"); + assertEquals(result.parts[1], "tenant"); + } + + @Test + public void testSplitPathWithSubscriptionPath() { + // Test subscription path: /managed-ledgers/tenant/namespace/persistent/topic/subscription + DefaultMetadataNodeSizeStats.SplitPathRes result = DefaultMetadataNodeSizeStats + .splitPath("/managed-ledgers/tenant/namespace/persistent/topic/subscription"); + + assertEquals(result.partCount, 6); + assertEquals(result.parts[0], "managed-ledgers"); + assertEquals(result.parts[1], "tenant"); + } + + @Test + public void testSplitPathWithTrailingSlash() { + // Test path with trailing slash: /admin/clusters/ + DefaultMetadataNodeSizeStats.SplitPathRes result = + DefaultMetadataNodeSizeStats.splitPath("/admin/clusters/"); + + assertEquals(result.partCount, 2); + assertEquals(result.parts[0], "admin"); + assertEquals(result.parts[1], "clusters"); + } + + @Test + public void testSplitPathWithMultipleTrailingSlashes() { + // Test path with multiple trailing slashes: /admin/clusters/// + DefaultMetadataNodeSizeStats.SplitPathRes result = + DefaultMetadataNodeSizeStats.splitPath("/admin/clusters///"); + + assertEquals(result.partCount, 2); + assertEquals(result.parts[0], "admin"); + assertEquals(result.parts[1], "clusters"); + } + + @Test + public void testSplitPathWithConsecutiveSlashes() { + // Test path with consecutive slashes: /admin//clusters + DefaultMetadataNodeSizeStats.SplitPathRes result = + DefaultMetadataNodeSizeStats.splitPath("/admin//clusters"); + + assertEquals(result.partCount, 2); + assertEquals(result.parts[0], "admin"); + assertEquals(result.parts[1], "clusters"); + } + + @Test + public void testSplitPathWithLeadingSlashes() { + // Test path with multiple leading slashes: ///admin/clusters + DefaultMetadataNodeSizeStats.SplitPathRes result = + DefaultMetadataNodeSizeStats.splitPath("///admin/clusters"); + + assertEquals(result.partCount, 2); + assertEquals(result.parts[0], "admin"); + assertEquals(result.parts[1], "clusters"); + } + + @Test + public void testSplitPathWithSinglePart() { + // Test path with single part: /admin + DefaultMetadataNodeSizeStats.SplitPathRes result = + DefaultMetadataNodeSizeStats.splitPath("/admin"); + + assertEquals(result.partCount, 1); + assertEquals(result.parts[0], "admin"); + assertNull(result.parts[1]); + } + + @Test + public void testSplitPathWithEmptyParts() { + // Test path with empty parts: /admin//clusters//tenant + DefaultMetadataNodeSizeStats.SplitPathRes result = + DefaultMetadataNodeSizeStats.splitPath("/admin//clusters//tenant"); + + assertEquals(result.partCount, 3); + assertEquals(result.parts[0], "admin"); + assertEquals(result.parts[1], "clusters"); + } + + @Test + public void testSplitPathWithSpecialCharacters() { + // Test path with special characters: /admin-test/clusters_test + DefaultMetadataNodeSizeStats.SplitPathRes result = + DefaultMetadataNodeSizeStats.splitPath("/admin-test/clusters_test"); + + assertEquals(result.partCount, 2); + assertEquals(result.parts[0], "admin-test"); + assertEquals(result.parts[1], "clusters_test"); + } + + @Test + public void testSplitPathWithNumbers() { + // Test path with numbers: /admin123/clusters456/tenant789 + DefaultMetadataNodeSizeStats.SplitPathRes result = + DefaultMetadataNodeSizeStats.splitPath("/admin123/clusters456/tenant789"); + + assertEquals(result.partCount, 3); + assertEquals(result.parts[0], "admin123"); + assertEquals(result.parts[1], "clusters456"); + } + + @Test + public void testSplitPathWithLongPath() { + // Test very long path with many parts + String longPath = "/part1/part2/part3/part4/part5/part6/part7/part8/part9/part10"; + DefaultMetadataNodeSizeStats.SplitPathRes result = + DefaultMetadataNodeSizeStats.splitPath(longPath); + + assertEquals(result.partCount, 10); + assertEquals(result.parts[0], "part1"); + assertEquals(result.parts[1], "part2"); + // Only first two parts are stored + } + + @Test + public void testSplitPathWithShortValidPath() { + // Test shortest valid path: /a + DefaultMetadataNodeSizeStats.SplitPathRes result = + DefaultMetadataNodeSizeStats.splitPath("/a"); + + assertEquals(result.partCount, 1); + assertEquals(result.parts[0], "a"); + assertNull(result.parts[1]); + } + + @Test + public void testSplitPathWithTwoCharacterPath() { + // Test two character path: /ab + DefaultMetadataNodeSizeStats.SplitPathRes result = + DefaultMetadataNodeSizeStats.splitPath("/ab"); + + assertEquals(result.partCount, 1); + assertEquals(result.parts[0], "ab"); + assertNull(result.parts[1]); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/DefaultMetadataNodeSizeStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/DefaultMetadataNodeSizeStatsTest.java new file mode 100644 index 00000000000..bfbed2324d1 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/DefaultMetadataNodeSizeStatsTest.java @@ -0,0 +1,484 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.zookeeper; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.pulsar.metadata.api.GetResult; +import org.apache.pulsar.metadata.api.Stat; +import org.mockito.Mockito; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Comprehensive test suite for DefaultMetadataNodeSizeStats class. + * + * This test covers all the functionality of the DefaultMetadataNodeSizeStats class including: + * - Path type classification for different Pulsar metadata paths + * - Size tracking for put and get operations + * - Children count tracking for list operations + * - Proper handling of edge cases and null values + * - Path splitting functionality + */ +public class DefaultMetadataNodeSizeStatsTest { + + private DefaultMetadataNodeSizeStats stats; + + @BeforeMethod + public void setUp() { + stats = new DefaultMetadataNodeSizeStats(); + } + + @Test + public void testRecordPutAndGetMaxSize() { + // Test with cluster path + String clusterPath = "/admin/clusters/test-cluster"; + byte[] smallData = "small".getBytes(StandardCharsets.UTF_8); + byte[] largeData = "this is a much larger data payload".getBytes(StandardCharsets.UTF_8); + + // Initially should return UNSET (-1) + assertEquals(stats.getMaxSizeOfSameResourceType(clusterPath), DefaultMetadataNodeSizeStats.UNSET); + + // Record small data first + stats.recordPut(clusterPath, smallData); + assertEquals(stats.getMaxSizeOfSameResourceType(clusterPath), smallData.length); + + // Record larger data - should update the max + stats.recordPut(clusterPath, largeData); + assertEquals(stats.getMaxSizeOfSameResourceType(clusterPath), largeData.length); + + // Record smaller data again - should not change the max + stats.recordPut(clusterPath, smallData); + assertEquals(stats.getMaxSizeOfSameResourceType(clusterPath), largeData.length); + } + + @Test + public void testRecordGetResAndGetMaxSize() { + String tenantPath = "/admin/policies/test-tenant"; + byte[] data1 = "data1".getBytes(StandardCharsets.UTF_8); + byte[] data2 = "much longer data payload".getBytes(StandardCharsets.UTF_8); + + // Create mock GetResult objects + Stat mockStat = Mockito.mock(Stat.class); + GetResult getResult1 = new GetResult(data1, mockStat); + GetResult getResult2 = new GetResult(data2, mockStat); + + // Initially should return UNSET (-1) + assertEquals(stats.getMaxSizeOfSameResourceType(tenantPath), DefaultMetadataNodeSizeStats.UNSET); + + // Record first result + stats.recordGetRes(tenantPath, getResult1); + assertEquals(stats.getMaxSizeOfSameResourceType(tenantPath), data1.length); + + // Record larger result - should update the max + stats.recordGetRes(tenantPath, getResult2); + assertEquals(stats.getMaxSizeOfSameResourceType(tenantPath), data2.length); + + // Record smaller result again - should not change the max + stats.recordGetRes(tenantPath, getResult1); + assertEquals(stats.getMaxSizeOfSameResourceType(tenantPath), data2.length); + } + + @Test + public void testRecordGetChildrenResAndGetMaxChildrenCount() { + String namespacePath = "/admin/policies/test-tenant/test-namespace"; + List<String> smallList = Arrays.asList("a", "b", "c"); + List<String> largeList = Arrays.asList("longer-name-1", "longer-name-2", "longer-name-3", "longer-name-4"); + + // Initially should return UNSET (-1) + assertEquals(stats.getMaxChildrenCountOfSameResourceType(namespacePath), DefaultMetadataNodeSizeStats.UNSET); + + // Record small list first - should track the count, not the total length + stats.recordGetChildrenRes(namespacePath, smallList); + assertEquals(stats.getMaxChildrenCountOfSameResourceType(namespacePath), smallList.size()); + + // Record larger list - should update the max count + stats.recordGetChildrenRes(namespacePath, largeList); + assertEquals(stats.getMaxChildrenCountOfSameResourceType(namespacePath), largeList.size()); + + // Record smaller list again - should not change the max + stats.recordGetChildrenRes(namespacePath, smallList); + assertEquals(stats.getMaxChildrenCountOfSameResourceType(namespacePath), largeList.size()); + } + + @Test + public void testUnknownPathsReturnMinusOne() { + // Test that unknown paths return -1 (not UNSET from internal storage) + String unknownPath = "/some/unknown/path"; + assertEquals(stats.getMaxSizeOfSameResourceType(unknownPath), -1); + assertEquals(stats.getMaxChildrenCountOfSameResourceType(unknownPath), -1); + + // Test that recording data for unknown paths is ignored + byte[] data = "test-data".getBytes(StandardCharsets.UTF_8); + stats.recordPut(unknownPath, data); + assertEquals(stats.getMaxSizeOfSameResourceType(unknownPath), -1); + + // Test that recording children for unknown paths is ignored + List<String> children = Arrays.asList("child1", "child2"); + stats.recordGetChildrenRes(unknownPath, children); + assertEquals(stats.getMaxChildrenCountOfSameResourceType(unknownPath), -1); + } + + @Test + public void testNullGetResult() { + String testPath = "/admin/clusters/test"; + + // Recording null GetResult should not affect the stats + stats.recordGetRes(testPath, null); + assertEquals(stats.getMaxSizeOfSameResourceType(testPath), DefaultMetadataNodeSizeStats.UNSET); + + // Record valid data first + byte[] data = "test-data".getBytes(StandardCharsets.UTF_8); + stats.recordPut(testPath, data); + assertEquals(stats.getMaxSizeOfSameResourceType(testPath), data.length); + + // Recording null GetResult should not change the existing max + stats.recordGetRes(testPath, null); + assertEquals(stats.getMaxSizeOfSameResourceType(testPath), data.length); + } + + @Test + public void testAdminPathTypeClassification() { + // Test cluster paths + String clusterPath = "/admin/clusters/test-cluster"; + byte[] clusterData = "cluster-data".getBytes(StandardCharsets.UTF_8); + stats.recordPut(clusterPath, clusterData); + assertEquals(stats.getMaxSizeOfSameResourceType(clusterPath), clusterData.length); + + // Test tenant paths (policies with 2 parts) + String tenantPath = "/admin/policies/test-tenant"; + byte[] tenantData = "tenant-data".getBytes(StandardCharsets.UTF_8); + stats.recordPut(tenantPath, tenantData); + assertEquals(stats.getMaxSizeOfSameResourceType(tenantPath), tenantData.length); + + // Test namespace policy paths (policies with 3 parts) + String namespacePath = "/admin/policies/test-tenant/test-namespace"; + byte[] namespaceData = "namespace-data".getBytes(StandardCharsets.UTF_8); + stats.recordPut(namespacePath, namespaceData); + assertEquals(stats.getMaxSizeOfSameResourceType(namespacePath), namespaceData.length); + + // Test local policies paths + String localPoliciesPath = "/admin/local-policies/test-tenant/test-namespace"; + byte[] localPoliciesData = "local-policies-data".getBytes(StandardCharsets.UTF_8); + stats.recordPut(localPoliciesPath, localPoliciesData); + assertEquals(stats.getMaxSizeOfSameResourceType(localPoliciesPath), localPoliciesData.length); + + // Test partitioned namespace paths (5 parts) + String partitionedNamespacePath = "/admin/partitioned-topics/persistent/test-tenant/test-namespace"; + byte[] partitionedNamespaceData = "partitioned-namespace-data".getBytes(StandardCharsets.UTF_8); + stats.recordPut(partitionedNamespacePath, partitionedNamespaceData); + assertEquals(stats.getMaxSizeOfSameResourceType(partitionedNamespacePath), partitionedNamespaceData.length); + + // Test partitioned topic paths (6 parts) + String partitionedTopicPath = "/admin/partitioned-topics/persistent/test-tenant/test-namespace/test-topic"; + byte[] partitionedTopicData = "partitioned-topic-data".getBytes(StandardCharsets.UTF_8); + stats.recordPut(partitionedTopicPath, partitionedTopicData); + assertEquals(stats.getMaxSizeOfSameResourceType(partitionedTopicPath), partitionedTopicData.length); + + // Verify that different path types maintain separate max values + assertNotEquals(stats.getMaxSizeOfSameResourceType(clusterPath), + stats.getMaxSizeOfSameResourceType(tenantPath)); + assertNotEquals(stats.getMaxSizeOfSameResourceType(tenantPath), + stats.getMaxSizeOfSameResourceType(namespacePath)); + } + + @Test + public void testManagedLedgerPathTypes() { + // Test ML namespace paths (4 parts) + String mlNamespacePath = "/managed-ledgers/tenant/namespace/persistent"; + byte[] mlNamespaceData = "ml-namespace-data".getBytes(StandardCharsets.UTF_8); + stats.recordPut(mlNamespacePath, mlNamespaceData); + assertEquals(stats.getMaxSizeOfSameResourceType(mlNamespacePath), mlNamespaceData.length); + + // Test topic paths (5 parts) + String topicPath = "/managed-ledgers/tenant/namespace/persistent/topic"; + byte[] topicData = "topic-data".getBytes(StandardCharsets.UTF_8); + stats.recordPut(topicPath, topicData); + assertEquals(stats.getMaxSizeOfSameResourceType(topicPath), topicData.length); + + // Test v2 subscription paths (6 parts) + String v2SubscriptionPath = "/managed-ledgers/tenant/namespace/persistent/topic/subscription"; + byte[] v2SubscriptionData = "v2-subscription-data".getBytes(StandardCharsets.UTF_8); + stats.recordPut(v2SubscriptionPath, v2SubscriptionData); + assertEquals(stats.getMaxSizeOfSameResourceType(v2SubscriptionPath), v2SubscriptionData.length); + + // Test v1 subscription paths (7 parts) + String v1SubscriptionPath = "/managed-ledgers/tenant/cluster/namespace/persistent/topic/subscription"; + byte[] v1SubscriptionData = "v1-subscription-data".getBytes(StandardCharsets.UTF_8); + stats.recordPut(v1SubscriptionPath, v1SubscriptionData); + assertEquals(stats.getMaxSizeOfSameResourceType(v1SubscriptionPath), v1SubscriptionData.length); + + // Verify that v2 and v1 subscriptions use the same path type (SUBSCRIPTION) + assertEquals(stats.getMaxSizeOfSameResourceType(v2SubscriptionPath), + Math.max(v2SubscriptionData.length, v1SubscriptionData.length)); + assertEquals(stats.getMaxSizeOfSameResourceType(v1SubscriptionPath), + Math.max(v2SubscriptionData.length, v1SubscriptionData.length)); + + // Verify different path types maintain separate max values + assertNotEquals(stats.getMaxSizeOfSameResourceType(mlNamespacePath), + stats.getMaxSizeOfSameResourceType(topicPath)); + assertNotEquals(stats.getMaxSizeOfSameResourceType(topicPath), + stats.getMaxSizeOfSameResourceType(v2SubscriptionPath)); + } + + @Test + public void testLoadBalancePathTypes() { + // Test brokers path (2 parts) + String brokersPath = "/loadbalance/brokers"; + byte[] brokersData = "brokers-data".getBytes(StandardCharsets.UTF_8); + stats.recordPut(brokersPath, brokersData); + assertEquals(stats.getMaxSizeOfSameResourceType(brokersPath), brokersData.length); + + // Test broker path (3 parts) + String brokerPath = "/loadbalance/brokers/broker1"; + byte[] brokerData = "broker-data".getBytes(StandardCharsets.UTF_8); + stats.recordPut(brokerPath, brokerData); + assertEquals(stats.getMaxSizeOfSameResourceType(brokerPath), brokerData.length); + + // Test bundle namespace path (4 parts) + String bundleNamespacePath = "/loadbalance/bundle-data/tenant/namespace"; + byte[] bundleNamespaceData = "bundle-namespace-data".getBytes(StandardCharsets.UTF_8); + stats.recordPut(bundleNamespacePath, bundleNamespaceData); + assertEquals(stats.getMaxSizeOfSameResourceType(bundleNamespacePath), bundleNamespaceData.length); + + // Test bundle data path (5 parts) + String bundleDataPath = "/loadbalance/bundle-data/tenant/namespace/bundle"; + byte[] bundleData = "bundle-data".getBytes(StandardCharsets.UTF_8); + stats.recordPut(bundleDataPath, bundleData); + assertEquals(stats.getMaxSizeOfSameResourceType(bundleDataPath), bundleData.length); + + // Test broker time average path (3 parts) + String brokerTimeAvgPath = "/loadbalance/broker-time-average/broker1"; + byte[] brokerTimeAvgData = "broker-time-avg-data".getBytes(StandardCharsets.UTF_8); + stats.recordPut(brokerTimeAvgPath, brokerTimeAvgData); + assertEquals(stats.getMaxSizeOfSameResourceType(brokerTimeAvgPath), brokerTimeAvgData.length); + + // Test leader path + String leaderPath = "/loadbalance/leader"; + byte[] leaderData = "leader-data".getBytes(StandardCharsets.UTF_8); + stats.recordPut(leaderPath, leaderData); + assertEquals(stats.getMaxSizeOfSameResourceType(leaderPath), leaderData.length); + + // Verify different path types maintain separate max values + assertNotEquals(stats.getMaxSizeOfSameResourceType(brokersPath), + stats.getMaxSizeOfSameResourceType(brokerPath)); + } + + @Test + public void testBundleOwnerPathTypes() { + // Test bundle owner namespace path (3 parts) + String bundleOwnerNamespacePath = "/namespace/tenant/namespace"; + byte[] bundleOwnerNamespaceData = "bundle-owner-namespace-data".getBytes(StandardCharsets.UTF_8); + stats.recordPut(bundleOwnerNamespacePath, bundleOwnerNamespaceData); + assertEquals(stats.getMaxSizeOfSameResourceType(bundleOwnerNamespacePath), bundleOwnerNamespaceData.length); + + // Test bundle owner path (4 parts) + String bundleOwnerPath = "/namespace/tenant/namespace/bundle"; + byte[] bundleOwnerData = "bundle-owner-data".getBytes(StandardCharsets.UTF_8); + stats.recordPut(bundleOwnerPath, bundleOwnerData); + assertEquals(stats.getMaxSizeOfSameResourceType(bundleOwnerPath), bundleOwnerData.length); + + // Verify different path types maintain separate max values + assertNotEquals(stats.getMaxSizeOfSameResourceType(bundleOwnerNamespacePath), + stats.getMaxSizeOfSameResourceType(bundleOwnerPath)); + } + + @Test + public void testSchemaPathTypes() { + // Test topic schema path (4 parts) + String schemaPath = "/schemas/tenant/namespace/topic"; + byte[] schemaData = "schema-data".getBytes(StandardCharsets.UTF_8); + stats.recordPut(schemaPath, schemaData); + assertEquals(stats.getMaxSizeOfSameResourceType(schemaPath), schemaData.length); + + // Test invalid schema path (3 parts) - should be UNKNOWN + String invalidSchemaPath = "/schemas/tenant/namespace"; + byte[] invalidSchemaData = "invalid-schema-data".getBytes(StandardCharsets.UTF_8); + stats.recordPut(invalidSchemaPath, invalidSchemaData); + assertEquals(stats.getMaxSizeOfSameResourceType(invalidSchemaPath), -1); + + // Verify they use different path types + assertNotEquals(stats.getMaxSizeOfSameResourceType(schemaPath), + stats.getMaxSizeOfSameResourceType(invalidSchemaPath)); + } + + @Test + public void testShortAndUnknownPaths() { + // Test paths that are too short to be classified + String shortPath1 = "/"; + String shortPath2 = "/admin"; + + byte[] data = "test-data".getBytes(StandardCharsets.UTF_8); + + // Recording data for unknown paths should be ignored + stats.recordPut(shortPath1, data); + stats.recordPut(shortPath2, data); + + // Both should return -1 since they are UNKNOWN path types + assertEquals(stats.getMaxSizeOfSameResourceType(shortPath1), -1); + assertEquals(stats.getMaxSizeOfSameResourceType(shortPath2), -1); + + // Test unknown admin paths + String unknownAdminPath = "/admin/unknown/path"; + byte[] unknownData = "unknown-admin-data".getBytes(StandardCharsets.UTF_8); + stats.recordPut(unknownAdminPath, unknownData); + + // Should return -1 since it's an UNKNOWN path type + assertEquals(stats.getMaxSizeOfSameResourceType(unknownAdminPath), -1); + + // Test children recording for unknown paths + List<String> children = Arrays.asList("child1", "child2"); + stats.recordGetChildrenRes(shortPath1, children); + assertEquals(stats.getMaxChildrenCountOfSameResourceType(shortPath1), -1); + } + + @Test + public void testEmptyAndZeroSizeData() { + String testPath = "/admin/clusters/test"; + + // Test with empty data + byte[] emptyData = new byte[0]; + stats.recordPut(testPath, emptyData); + assertEquals(stats.getMaxSizeOfSameResourceType(testPath), 0); + + // Test with empty list + List<String> emptyList = Collections.emptyList(); + stats.recordGetChildrenRes(testPath, emptyList); + assertEquals(stats.getMaxChildrenCountOfSameResourceType(testPath), 0); + + // Record larger data - should update the max + byte[] largerData = "larger-data".getBytes(StandardCharsets.UTF_8); + stats.recordPut(testPath, largerData); + assertEquals(stats.getMaxSizeOfSameResourceType(testPath), largerData.length); + + // Record larger list - should update the max + List<String> largerList = Arrays.asList("item1", "item2", "item3"); + stats.recordGetChildrenRes(testPath, largerList); + assertEquals(stats.getMaxChildrenCountOfSameResourceType(testPath), largerList.size()); + } + + @Test + public void testMixedOperationsOnSamePath() { + String testPath = "/admin/policies/test-tenant"; + + // Record via put + byte[] putData = "put-data".getBytes(StandardCharsets.UTF_8); + stats.recordPut(testPath, putData); + assertEquals(stats.getMaxSizeOfSameResourceType(testPath), putData.length); + + // Record via get with larger data - should update max + byte[] largerData = "much larger get result data".getBytes(StandardCharsets.UTF_8); + Stat mockStat = Mockito.mock(Stat.class); + GetResult getResult = new GetResult(largerData, mockStat); + stats.recordGetRes(testPath, getResult); + assertEquals(stats.getMaxSizeOfSameResourceType(testPath), largerData.length); + + // Record via put with smaller data - should not change max + byte[] smallerData = "small".getBytes(StandardCharsets.UTF_8); + stats.recordPut(testPath, smallerData); + assertEquals(stats.getMaxSizeOfSameResourceType(testPath), largerData.length); + + // Test children count operations + List<String> smallList = Arrays.asList("a", "b"); + stats.recordGetChildrenRes(testPath, smallList); + assertEquals(stats.getMaxChildrenCountOfSameResourceType(testPath), smallList.size()); + + List<String> largerList = Arrays.asList("a", "b", "c", "d", "e"); + stats.recordGetChildrenRes(testPath, largerList); + assertEquals(stats.getMaxChildrenCountOfSameResourceType(testPath), largerList.size()); + } + + @Test + public void testSplitPathFunctionality() { + // Test the static splitPath method indirectly through path type classification + + // Test path with trailing slashes + String pathWithTrailingSlash = "/admin/clusters/test-cluster/"; + byte[] data = "test-data".getBytes(StandardCharsets.UTF_8); + stats.recordPut(pathWithTrailingSlash, data); + assertEquals(stats.getMaxSizeOfSameResourceType(pathWithTrailingSlash), data.length); + + // Should be classified the same as path without trailing slash + String pathWithoutTrailingSlash = "/admin/clusters/test-cluster"; + assertEquals(stats.getMaxSizeOfSameResourceType(pathWithoutTrailingSlash), data.length); + + // Test path with multiple consecutive slashes + String pathWithMultipleSlashes = "/admin//clusters//test-cluster"; + assertEquals(stats.getMaxSizeOfSameResourceType(pathWithMultipleSlashes), data.length); + } + + @Test + public void testUnknownPathTypesAreIgnored() { + // Test that UNKNOWN path types don't affect any statistics + String[] unknownPaths = { + "/", + "/admin", + "/unknown/path", + "/admin/unknown/path/type", + "/managed-ledgers", // too short + "/loadbalance", // too short + "/schemas/tenant", // too short (needs 4 parts) + "/admin/policies/tenant/namespace/extra", // too long for policies + "/admin/partitioned-topics/persistent", // too short for partitioned topics + "/admin/partitioned-topics/persistent/tenant/namespace/topic/extra" // too long + }; + + byte[] data = "test-data".getBytes(StandardCharsets.UTF_8); + List<String> children = Arrays.asList("child1", "child2", "child3"); + + for (String unknownPath : unknownPaths) { + // Record operations should be ignored + stats.recordPut(unknownPath, data); + stats.recordGetChildrenRes(unknownPath, children); + + // Should always return -1 + assertEquals(stats.getMaxSizeOfSameResourceType(unknownPath), -1, + "Path should return -1: " + unknownPath); + assertEquals(stats.getMaxChildrenCountOfSameResourceType(unknownPath), -1, + "Path should return -1: " + unknownPath); + } + } + + @Test + public void testEmptyAndNullListHandling() { + String testPath = "/admin/clusters/test"; + + // Test with null list (should be handled gracefully) + stats.recordGetChildrenRes(testPath, null); + assertEquals(stats.getMaxChildrenCountOfSameResourceType(testPath), 0); + + // Test with empty list + List<String> emptyList = Collections.emptyList(); + stats.recordGetChildrenRes(testPath, emptyList); + assertEquals(stats.getMaxChildrenCountOfSameResourceType(testPath), 0); + + // Test with non-empty list - should update the max + List<String> nonEmptyList = Arrays.asList("item1", "item2", "item3"); + stats.recordGetChildrenRes(testPath, nonEmptyList); + assertEquals(stats.getMaxChildrenCountOfSameResourceType(testPath), nonEmptyList.size()); + + // Test with empty list again - should not change the max + stats.recordGetChildrenRes(testPath, emptyList); + assertEquals(stats.getMaxChildrenCountOfSameResourceType(testPath), nonEmptyList.size()); + } +} \ No newline at end of file diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/DummyMetadataNodeSizeStats.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/DummyMetadataNodeSizeStats.java new file mode 100644 index 00000000000..ba60c662588 --- /dev/null +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/DummyMetadataNodeSizeStats.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.metadata.api; + +import java.util.List; + +public class DummyMetadataNodeSizeStats implements MetadataNodeSizeStats { + + @Override + public void recordPut(String path, byte[] data) {} + + @Override + public void recordGetRes(String path, GetResult getResult) {} + + @Override + public void recordGetChildrenRes(String path, List<String> list) {} + + @Override + public int getMaxSizeOfSameResourceType(String path) { + return 1; + } + + @Override + public int getMaxChildrenCountOfSameResourceType(String path) { + return 1; + } +} diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataNodeSizeStats.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataNodeSizeStats.java new file mode 100644 index 00000000000..92d6084b46c --- /dev/null +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataNodeSizeStats.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.metadata.api; + +import java.util.List; + +/*** + * The interface to cache the max payload length of metadata node. It is helpful for the following cases: + * 1. the limitation of the response of batching query from metadata store. For example, the ZK client limits the max + * length of response packet to 1MB by default, if the response packet is larger than the limitation, the ZK client + * will throw an error "Packet len {len} is out of range!" and reconnects. + * 2. expose the metrics of payload length of metadata node. + */ +public interface MetadataNodeSizeStats { + + /** + * Record the payload length of put operation. + */ + void recordPut(String path, byte[] data); + + /** + * Record the payload length of get result. + */ + void recordGetRes(String path, GetResult getResult); + + /** + * Record the payload length of list result. + */ + void recordGetChildrenRes(String path, List<String> list); + + /** + * Get the max size of same resource type. + */ + int getMaxSizeOfSameResourceType(String path); + + /** + * Get the max children count of same resource type. + */ + int getMaxChildrenCountOfSameResourceType(String path); +} \ No newline at end of file diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java index be29f843eea..ef50dc87691 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java @@ -99,4 +99,9 @@ public class MetadataStoreConfig { */ @Builder.Default private OpenTelemetry openTelemetry = OpenTelemetry.noop(); + + /** + * The estimator to estimate the payload length of metadata node, which used to limit the batch size requested. + */ + private MetadataNodeSizeStats nodeSizeStats; } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java index 506df8b631d..b0e4b43f700 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java @@ -51,11 +51,13 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.stats.CacheMetricsCollector; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.metadata.api.DummyMetadataNodeSizeStats; import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.MetadataCacheConfig; import org.apache.pulsar.metadata.api.MetadataEvent; import org.apache.pulsar.metadata.api.MetadataEventSynchronizer; +import org.apache.pulsar.metadata.api.MetadataNodeSizeStats; import org.apache.pulsar.metadata.api.MetadataSerde; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.Notification; @@ -89,7 +91,12 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co protected abstract CompletableFuture<Boolean> existsFromStore(String path); - protected AbstractMetadataStore(String metadataStoreName, OpenTelemetry openTelemetry) { + protected MetadataNodeSizeStats nodeSizeStats; + + protected AbstractMetadataStore(String metadataStoreName, OpenTelemetry openTelemetry, + MetadataNodeSizeStats nodeSizeStats) { + this.nodeSizeStats = nodeSizeStats == null ? new DummyMetadataNodeSizeStats() + : nodeSizeStats; this.executor = new ScheduledThreadPoolExecutor(1, new DefaultThreadFactory( StringUtils.isNotBlank(metadataStoreName) ? metadataStoreName : getClass().getSimpleName())); @@ -280,6 +287,7 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co return storeGet(path) .whenComplete((v, t) -> { if (t != null) { + v.ifPresent(getResult -> nodeSizeStats.recordGetRes(path, getResult)); metadataStoreStats.recordGetOpsFailed(System.currentTimeMillis() - start); } else { metadataStoreStats.recordGetOpsSucceeded(System.currentTimeMillis() - start); @@ -302,7 +310,11 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co if (!isValidPath(path)) { return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path)); } - return childrenCache.get(path); + CompletableFuture<List<String>> listFuture = childrenCache.get(path); + listFuture.thenAccept((list) -> { + nodeSizeStats.recordGetChildrenRes(path, list); + }); + return listFuture; } @Override @@ -488,6 +500,7 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co NotificationType type = stat.isFirstVersion() ? NotificationType.Created : NotificationType.Modified; if (type == NotificationType.Created) { + nodeSizeStats.recordPut(path, data); existsCache.synchronous().invalidate(path); String parent = parent(path); if (parent != null) { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java index e95f1947740..079cb3130e0 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java @@ -78,7 +78,8 @@ public class LocalMemoryMetadataStore extends AbstractMetadataStore implements M public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig) throws MetadataStoreException { - super(metadataStoreConfig.getMetadataStoreName(), metadataStoreConfig.getOpenTelemetry()); + super(metadataStoreConfig.getMetadataStoreName(), metadataStoreConfig.getOpenTelemetry(), + metadataStoreConfig.getNodeSizeStats()); String name = metadataURL.substring(MEMORY_SCHEME_IDENTIFIER.length()); // Local means a private data set // update synchronizer and register sync listener diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java index 752fc7153cf..74bddda7454 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java @@ -209,7 +209,8 @@ public class RocksdbMetadataStore extends AbstractMetadataStore { */ private RocksdbMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig) throws MetadataStoreException { - super(metadataStoreConfig.getMetadataStoreName(), metadataStoreConfig.getOpenTelemetry()); + super(metadataStoreConfig.getMetadataStoreName(), metadataStoreConfig.getOpenTelemetry(), + metadataStoreConfig.getNodeSizeStats()); this.metadataUrl = metadataURL; try { RocksDB.loadLibrary(); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java index d3065fcaae2..5bf7e2272f0 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java @@ -31,7 +31,7 @@ import java.util.stream.Collectors; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; -import org.apache.commons.lang3.tuple.Pair; +import org.apache.commons.lang3.tuple.Triple; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataStore; @@ -54,6 +54,7 @@ import org.apache.pulsar.metadata.impl.batching.OpDelete; import org.apache.pulsar.metadata.impl.batching.OpGet; import org.apache.pulsar.metadata.impl.batching.OpGetChildren; import org.apache.pulsar.metadata.impl.batching.OpPut; +import org.apache.pulsar.metadata.impl.batching.ZKMetadataStoreBatchStrategy; import org.apache.zookeeper.AddWatchMode; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; @@ -110,6 +111,7 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore sessionWatcher = null; } zkc.addWatch("/", eventWatcher, AddWatchMode.PERSISTENT_RECURSIVE); + initBatchStrategy(); } catch (Throwable t) { throw new MetadataStoreException(t); } @@ -144,6 +146,14 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore this.zkc = zkc; this.sessionWatcher = new ZKSessionWatcher(zkc, this::receivedSessionEvent); zkc.addWatch("/", eventWatcher, AddWatchMode.PERSISTENT_RECURSIVE); + initBatchStrategy(); + } + + private void initBatchStrategy() { + ZKMetadataStoreBatchStrategy batchStrategy = + new ZKMetadataStoreBatchStrategy(nodeSizeStats, maxOperations, maxSize, zkc); + this.maxSize = batchStrategy.maxSize(); + super.metadataStoreBatchStrategy = batchStrategy; } @Override @@ -204,18 +214,34 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore // Build the log warning message // summarize the operations by type + final int logThresholdPut = maxSize >> 4; + final int logThresholdGet = maxSize >> 4; String countsByType = ops.stream().collect( Collectors.groupingBy(MetadataOp::getType, Collectors.summingInt(op -> 1))) - .entrySet().stream().map(e -> e.getValue() + " " + e.getKey().name() + " entries") + .entrySet().stream().map(e -> e.getValue() + " " + e.getKey().name()) .collect(Collectors.joining(", ")); - List<Pair> opsForLog = ops.stream() - .filter(item -> item.size() > 256 * 1024) - .map(op -> Pair.of(op.getPath(), op.size())) + boolean shouldLimitLogLen = ops.size() > 16; + List<Triple<String, String, Integer>> opsForLog = ops.stream() + .filter(item -> { + if (!shouldLimitLogLen) { + return true; + } + return switch (item.getType()) { + case PUT -> item.asPut().getData().length > logThresholdPut; + case GET -> nodeSizeStats + .getMaxSizeOfSameResourceType(item.getPath()) > logThresholdGet; + case GET_CHILDREN -> nodeSizeStats + .getMaxChildrenCountOfSameResourceType(item.getPath()) > 512; + default -> false; + }; + }) + .map(op -> Triple.of(op.getPath(), op.getType().toString(), op.size())) .collect(Collectors.toList()); Long totalSize = ops.stream().collect(Collectors.summingLong(MetadataOp::size)); log.warn("Connection loss while executing batch operation of {} " - + "of total data size of {}. " - + "Retrying individual operations one-by-one. ops whose size > 256KB: {}", + + "of total requested data size of {}. " + + "Retrying individual operations one-by-one." + + " ops that maybe large: {}", countsByType, totalSize, opsForLog); // Retry with the individual operations diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java index 865213643c3..a9319a50fec 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.metadata.impl.batching; -import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; import java.util.List; @@ -28,6 +27,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataEventSynchronizer; import org.apache.pulsar.metadata.api.MetadataStoreConfig; @@ -50,13 +50,14 @@ public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore private final boolean enabled; private final int maxDelayMillis; - private final int maxOperations; - private final int maxSize; + protected final int maxOperations; + protected int maxSize; private MetadataEventSynchronizer synchronizer; private final BatchMetadataStoreStats batchMetadataStoreStats; + protected MetadataStoreBatchStrategy metadataStoreBatchStrategy; protected AbstractBatchedMetadataStore(MetadataStoreConfig conf) { - super(conf.getMetadataStoreName(), conf.getOpenTelemetry()); + super(conf.getMetadataStoreName(), conf.getOpenTelemetry(), conf.getNodeSizeStats()); this.enabled = conf.isBatchingEnabled(); this.maxDelayMillis = conf.getBatchingMaxDelayMillis(); @@ -78,6 +79,7 @@ public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore updateMetadataEventSynchronizer(conf.getSynchronizer()); this.batchMetadataStoreStats = new BatchMetadataStoreStats(metadataStoreName, executor, conf.getOpenTelemetry()); + this.metadataStoreBatchStrategy = new DefaultMetadataStoreBatchStrategy(maxOperations, maxSize); } @Override @@ -100,37 +102,16 @@ public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore } private void flush() { - while (!readOps.isEmpty()) { - List<MetadataOp> ops = new ArrayList<>(); - for (int i = 0; i < maxOperations; i++) { - MetadataOp op = readOps.poll(); - if (op == null) { - break; - } - ops.add(op); + List<MetadataOp> currentBatch; + if (!readOps.isEmpty()) { + while (CollectionUtils.isNotEmpty(currentBatch = metadataStoreBatchStrategy.nextBatch(readOps))) { + internalBatchOperation(currentBatch); } - internalBatchOperation(ops); } - - while (!writeOps.isEmpty()) { - int batchSize = 0; - - List<MetadataOp> ops = new ArrayList<>(); - for (int i = 0; i < maxOperations; i++) { - MetadataOp op = writeOps.peek(); - if (op == null) { - break; - } - - if (i > 0 && (batchSize + op.size()) > maxSize) { - // We have already reached the max size, so flush the current batch - break; - } - - batchSize += op.size(); - ops.add(writeOps.poll()); + if (!writeOps.isEmpty()) { + while (CollectionUtils.isNotEmpty(currentBatch = metadataStoreBatchStrategy.nextBatch(writeOps))) { + internalBatchOperation(currentBatch); } - internalBatchOperation(ops); } flushInProgress.set(false); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/DefaultMetadataStoreBatchStrategy.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/DefaultMetadataStoreBatchStrategy.java new file mode 100644 index 00000000000..766c2551775 --- /dev/null +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/DefaultMetadataStoreBatchStrategy.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.metadata.impl.batching; + +import java.util.ArrayList; +import java.util.List; +import org.jctools.queues.MessagePassingQueue; + +/*** + * The default batching strategy, which only consider how the max operations and max size work for the request packet. + * And do not care about the response packet. + */ +public class DefaultMetadataStoreBatchStrategy implements MetadataStoreBatchStrategy { + + private final int maxOperations; + private final int maxPutSize; + + public DefaultMetadataStoreBatchStrategy(int maxOperations, int maxPutSize) { + this.maxOperations = maxOperations; + this.maxPutSize = maxPutSize; + } + + @Override + public List<MetadataOp> nextBatch(MessagePassingQueue<MetadataOp> opsSrc) { + int requestSize = 0; + List<MetadataOp> ops = new ArrayList<>(); + while (!opsSrc.isEmpty()) { + MetadataOp op = opsSrc.peek(); + if (op == null) { + break; + } + MetadataOp.Type type = op.getType(); + switch (type) { + case PUT: + case DELETE: { + requestSize += op.size(); + break; + } + default: {} + } + if (!ops.isEmpty() && requestSize > maxPutSize) { + // We have already reached the max size, so flush the current batch. + break; + } + ops.add(opsSrc.poll()); + if (ops.size() == maxOperations) { + break; + } + } + return ops; + } +} \ No newline at end of file diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/MetadataStoreBatchStrategy.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/MetadataStoreBatchStrategy.java new file mode 100644 index 00000000000..e37d80c27b3 --- /dev/null +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/MetadataStoreBatchStrategy.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.metadata.impl.batching; + +import java.util.List; +import org.jctools.queues.MessagePassingQueue; + +/*** + * Used to split ops into multi batch. Because that one batch may can not afford all ops. + */ +public interface MetadataStoreBatchStrategy { + + /** + * Get the next batch of operations to execute. + */ + List<MetadataOp> nextBatch(MessagePassingQueue<MetadataOp> ops); +} \ No newline at end of file diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/ZKMetadataStoreBatchStrategy.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/ZKMetadataStoreBatchStrategy.java new file mode 100644 index 00000000000..ff8cc74d4c1 --- /dev/null +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/ZKMetadataStoreBatchStrategy.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.metadata.impl.batching; + +import java.util.ArrayList; +import java.util.List; +import org.apache.pulsar.metadata.api.MetadataNodeSizeStats; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.client.ZKClientConfig; +import org.apache.zookeeper.common.ZKConfig; +import org.jctools.queues.MessagePassingQueue; + +public class ZKMetadataStoreBatchStrategy implements MetadataStoreBatchStrategy { + + // The headers of response command contains the following attributes, which cost 88 bytes. + // Base attrs: xid(int), zxid(long), err(int), len(int) + // Stat attrs: czxid(long), mzxid(long), ctime(long), mtime(long), version(int), cversion(int), aversion(int) + // ephemeralOwner(long), dataLength(int), numChildren(int), pzxid(long) + // By the way, the length of response header may be different between different version, since we had use a half + // of max size, we can skip to consider the difference. + public static final int ZK_RESPONSE_HEADER_LEN = 88; + private final int defaultSize; + + private final int maxOperations; + private final int maxGetSize; + private final int maxPutSize; + private final MetadataNodeSizeStats nodeSizeStats; + + public ZKMetadataStoreBatchStrategy(MetadataNodeSizeStats nodeSizeStats, int maxOperations, int defaultMaxSize, + ZooKeeper zkc) { + int maxSizeConfigured = zkc.getClientConfig().getInt( + ZKConfig.JUTE_MAXBUFFER, + ZKClientConfig.CLIENT_MAX_PACKET_LENGTH_DEFAULT); + maxSizeConfigured = maxSizeConfigured > 0 ? maxSizeConfigured : defaultMaxSize; + this.maxOperations = maxOperations; + this.maxGetSize = maxSizeConfigured; + this.maxPutSize = maxSizeConfigured; + this.nodeSizeStats = nodeSizeStats; + // If the size of the node can not be calculated by "nodeSizeStats", at most package 8 ops into a batch. + this.defaultSize = Math.max(maxPutSize >>> 4, 1024); + } + + public int maxSize() { + return maxPutSize; + } + + @Override + public List<MetadataOp> nextBatch(MessagePassingQueue<MetadataOp> opsSrc) { + int requestSize = 0; + int estimatedResponseSize = 0; + // Since the response size is estimated, we use half of the max size to be safe. + int maxGetSize = this.maxGetSize >>> 1; + List<MetadataOp> ops = new ArrayList<>(); + while (!opsSrc.isEmpty()) { + MetadataOp op = opsSrc.peek(); + if (op == null) { + break; + } + MetadataOp.Type type = op.getType(); + String path = op.getPath(); + switch (type) { + case GET_CHILDREN: { + estimatedResponseSize += ZK_RESPONSE_HEADER_LEN; + int childrenCount = nodeSizeStats.getMaxChildrenCountOfSameResourceType(path); + if (childrenCount < 0) { + estimatedResponseSize += defaultSize; + break; + } + // The way that combines list of nodes is as follows + // [4 bytes that indicates the length of the next item] [item name]. + // So we add 4 bytes for each item. + int size = nodeSizeStats.getMaxSizeOfSameResourceType(path); + if (size > 0) { + estimatedResponseSize += (childrenCount * (size + 4)); + } else { + estimatedResponseSize += (childrenCount * (defaultSize + 4)); + } + break; + } + case GET: { + estimatedResponseSize += ZK_RESPONSE_HEADER_LEN; + int size = nodeSizeStats.getMaxSizeOfSameResourceType(path); + if (size > 0) { + estimatedResponseSize += size; + } else { + estimatedResponseSize += defaultSize; + } + break; + } + case DELETE: + case PUT: { + requestSize += op.size(); + // The response of creation contains two attributes: stat and path, so we add them into the + // estimation response size. + estimatedResponseSize += ZK_RESPONSE_HEADER_LEN; + estimatedResponseSize += path.length(); + break; + } + default: { + estimatedResponseSize += ZK_RESPONSE_HEADER_LEN; + estimatedResponseSize += path.length(); + } + } + if (!ops.isEmpty() && (estimatedResponseSize > maxGetSize || requestSize > maxPutSize)) { + // We have already reached the max size, so flush the current batch. + break; + } + ops.add(opsSrc.poll()); + if (ops.size() == maxOperations) { + break; + } + } + return ops; + } +} \ No newline at end of file diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java index c7e1ce24b08..7f0dc6fba10 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java @@ -61,7 +61,7 @@ public class OxiaMetadataStore extends AbstractMetadataStore { private Optional<MetadataEventSynchronizer> synchronizer; public OxiaMetadataStore(AsyncOxiaClient oxia, String identity) { - super("oxia-metadata", OpenTelemetry.noop()); + super("oxia-metadata", OpenTelemetry.noop(), null); this.client = oxia; this.identity = identity; this.synchronizer = Optional.empty(); @@ -74,7 +74,8 @@ public class OxiaMetadataStore extends AbstractMetadataStore { MetadataStoreConfig metadataStoreConfig, boolean enableSessionWatcher) throws Exception { - super("oxia-metadata", Objects.requireNonNull(metadataStoreConfig).getOpenTelemetry()); + super("oxia-metadata", Objects.requireNonNull(metadataStoreConfig).getOpenTelemetry(), + metadataStoreConfig.getNodeSizeStats()); var linger = metadataStoreConfig.getBatchingMaxDelayMillis(); if (!metadataStoreConfig.isBatchingEnabled()) { diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java index b9abaece9e4..d42b2228346 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java @@ -96,7 +96,7 @@ public class MetadataStoreFactoryImplTest { public static class MyMetadataStore extends AbstractMetadataStore { protected MyMetadataStore() { - super("custom", OpenTelemetry.noop()); + super("custom", OpenTelemetry.noop(), null); } @Override diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/batching/ZKMetadataStoreBatchStrategyTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/batching/ZKMetadataStoreBatchStrategyTest.java new file mode 100644 index 00000000000..4fe2611a4de --- /dev/null +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/batching/ZKMetadataStoreBatchStrategyTest.java @@ -0,0 +1,637 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.metadata.impl.batching; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.metadata.api.MetadataNodeSizeStats; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.client.ZKClientConfig; +import org.jctools.queues.MpscUnboundedArrayQueue; +import org.mockito.Mockito; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Comprehensive test suite for ZKMetadataStoreBatchStrategy class. + * This test covers: + * - Constructor and configuration handling + * - Batch size calculations for different operation types + * - Size limit enforcement for requests and responses + * - Operation count limits + * - Edge cases and boundary conditions + */ +public class ZKMetadataStoreBatchStrategyTest { + + private ZKMetadataStoreBatchStrategy strategy; + private MetadataNodeSizeStats mockNodeSizeStats; + private ZooKeeper mockZooKeeper; + private ZKClientConfig mockClientConfig; + private MpscUnboundedArrayQueue<MetadataOp> operationQueue; + + private static final int DEFAULT_MAX_OPERATIONS = 100; + private static final int DEFAULT_MAX_SIZE = 1024 * 1024; // 1MB + private static final String TEST_PATH = "/test/path"; + + @BeforeMethod + public void setUp() { + mockNodeSizeStats = Mockito.mock(MetadataNodeSizeStats.class); + mockZooKeeper = Mockito.mock(ZooKeeper.class); + mockClientConfig = Mockito.mock(ZKClientConfig.class); + operationQueue = new MpscUnboundedArrayQueue<>(1000); + // Setup default mock behavior + Mockito.when(mockZooKeeper.getClientConfig()).thenReturn(mockClientConfig); + Mockito.when(mockClientConfig.getInt(Mockito.anyString(), Mockito.anyInt())) + .thenReturn(DEFAULT_MAX_SIZE); + // Setup default node size stats + Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString())) + .thenReturn(100); + Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(Mockito.anyString())) + .thenReturn(10); + strategy = new ZKMetadataStoreBatchStrategy(mockNodeSizeStats, DEFAULT_MAX_OPERATIONS, + DEFAULT_MAX_SIZE, mockZooKeeper); + } + + @Test + public void testConstructorWithDefaultMaxSize() { + // Test constructor uses default max size when ZK config returns 0 or negative + Mockito.when(mockClientConfig.getInt(Mockito.anyString(), Mockito.anyInt())) + .thenReturn(0); + ZKMetadataStoreBatchStrategy strategyWithDefault = new ZKMetadataStoreBatchStrategy( + mockNodeSizeStats, DEFAULT_MAX_OPERATIONS, DEFAULT_MAX_SIZE, mockZooKeeper); + assertEquals(strategyWithDefault.maxSize(), DEFAULT_MAX_SIZE); + } + + @Test + public void testConstructorWithConfiguredMaxSize() { + int configuredSize = 2 * 1024 * 1024; // 2MB + Mockito.when(mockClientConfig.getInt(Mockito.anyString(), Mockito.anyInt())) + .thenReturn(configuredSize); + ZKMetadataStoreBatchStrategy strategyWithConfig = new ZKMetadataStoreBatchStrategy( + mockNodeSizeStats, DEFAULT_MAX_OPERATIONS, DEFAULT_MAX_SIZE, mockZooKeeper); + assertEquals(strategyWithConfig.maxSize(), configuredSize); + } + + @Test + public void testMaxSizeMethod() { + assertEquals(strategy.maxSize(), DEFAULT_MAX_SIZE); + } + + @Test + public void testEmptyQueue() { + List<MetadataOp> batch = strategy.nextBatch(operationQueue); + assertTrue(batch.isEmpty()); + } + + @Test + public void testSingleGetOperation() { + MetadataOp getOp = createMockGetOperation(TEST_PATH); + operationQueue.offer(getOp); + List<MetadataOp> batch = strategy.nextBatch(operationQueue); + assertEquals(batch.size(), 1); + assertEquals(batch.get(0), getOp); + assertTrue(operationQueue.isEmpty()); + } + + @Test + public void testSingleGetChildrenOperation() { + MetadataOp getChildrenOp = createMockGetChildrenOperation(TEST_PATH); + operationQueue.offer(getChildrenOp); + List<MetadataOp> batch = strategy.nextBatch(operationQueue); + assertEquals(batch.size(), 1); + assertEquals(batch.get(0), getChildrenOp); + assertTrue(operationQueue.isEmpty()); + } + + @Test + public void testSinglePutOperation() { + MetadataOp putOp = createMockPutOperation(TEST_PATH, 100); + operationQueue.offer(putOp); + List<MetadataOp> batch = strategy.nextBatch(operationQueue); + assertEquals(batch.size(), 1); + assertEquals(batch.get(0), putOp); + assertTrue(operationQueue.isEmpty()); + } + + @Test + public void testSingleDeleteOperation() { + MetadataOp deleteOp = createMockDeleteOperation(TEST_PATH); + operationQueue.offer(deleteOp); + List<MetadataOp> batch = strategy.nextBatch(operationQueue); + assertEquals(batch.size(), 1); + assertEquals(batch.get(0), deleteOp); + assertTrue(operationQueue.isEmpty()); + } + + @Test + public void testMaxOperationsLimit() { + // Add more operations than the max limit + for (int i = 0; i < DEFAULT_MAX_OPERATIONS + 10; i++) { + operationQueue.offer(createMockGetOperation("/test/path/" + i)); + } + List<MetadataOp> batch = strategy.nextBatch(operationQueue); + assertEquals(batch.size(), DEFAULT_MAX_OPERATIONS); + assertEquals(operationQueue.size(), 10); // Remaining operations + } + + @Test + public void testRequestSizeLimit() { + // Create large PUT operations that exceed the request size limit + int largeSize = DEFAULT_MAX_SIZE / 2 + 1000; // Larger than half max size + operationQueue.offer(createMockPutOperation("/test/path1", largeSize)); + operationQueue.offer(createMockPutOperation("/test/path2", largeSize)); + operationQueue.offer(createMockPutOperation("/test/path3", 100)); // Small operation + List<MetadataOp> batch = strategy.nextBatch(operationQueue); + // Should only include the first operation due to size limit + assertEquals(batch.size(), 1); + assertEquals(operationQueue.size(), 2); // Two operations remaining + } + + // Helper methods for creating mock operations + private MetadataOp createMockGetOperation(String path) { + MetadataOp op = Mockito.mock(MetadataOp.class); + Mockito.when(op.getType()).thenReturn(MetadataOp.Type.GET); + Mockito.when(op.getPath()).thenReturn(path); + Mockito.when(op.size()).thenReturn(0); + Mockito.when(op.getFuture()).thenReturn(new CompletableFuture<>()); + return op; + } + + private MetadataOp createMockGetChildrenOperation(String path) { + MetadataOp op = Mockito.mock(MetadataOp.class); + Mockito.when(op.getType()).thenReturn(MetadataOp.Type.GET_CHILDREN); + Mockito.when(op.getPath()).thenReturn(path); + Mockito.when(op.size()).thenReturn(0); + Mockito.when(op.getFuture()).thenReturn(new CompletableFuture<>()); + return op; + } + + private MetadataOp createMockPutOperation(String path, int size) { + MetadataOp op = Mockito.mock(MetadataOp.class); + Mockito.when(op.getType()).thenReturn(MetadataOp.Type.PUT); + Mockito.when(op.getPath()).thenReturn(path); + Mockito.when(op.size()).thenReturn(size); + Mockito.when(op.getFuture()).thenReturn(new CompletableFuture<>()); + return op; + } + + private MetadataOp createMockDeleteOperation(String path) { + MetadataOp op = Mockito.mock(MetadataOp.class); + Mockito.when(op.getType()).thenReturn(MetadataOp.Type.DELETE); + Mockito.when(op.getPath()).thenReturn(path); + Mockito.when(op.size()).thenReturn(path.length()); + Mockito.when(op.getFuture()).thenReturn(new CompletableFuture<>()); + return op; + } + + @Test + public void testResponseSizeLimitForGetOperations() { + // Setup large response size for node stats + Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString())) + .thenReturn(DEFAULT_MAX_SIZE / 5); // Large response size + // Add multiple GET operations + for (int i = 0; i < 10; i++) { + operationQueue.offer(createMockGetOperation("/test/path/" + i)); + } + List<MetadataOp> batch = strategy.nextBatch(operationQueue); + // Should limit based on estimated response size (half of max size) + assertEquals(batch.size(), 2); + } + + @Test + public void testResponseSizeLimitForGetChildrenOperations() { + // Setup large response size for children operations + Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(Mockito.anyString())) + .thenReturn(1000); // Many children + Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString())) + .thenReturn(100); // Size per child + // Add multiple GET_CHILDREN operations + for (int i = 0; i < 10; i++) { + operationQueue.offer(createMockGetChildrenOperation("/test/path/" + i)); + } + List<MetadataOp> batch = strategy.nextBatch(operationQueue); + // Should limit based on estimated response size. + // DEFAULT_MAX_SIZE is 1024 * 1024, the half value is 1024 * 512. + // Per children response is 1000 * 100. + // So the result should be 5; + assertEquals(batch.size(), 5); + } + + @Test + public void testMixedOperationTypes() { + operationQueue.offer(createMockGetOperation("/test/get")); + operationQueue.offer(createMockPutOperation("/test/put", 100)); + operationQueue.offer(createMockDeleteOperation("/test/delete")); + operationQueue.offer(createMockGetChildrenOperation("/test/children")); + List<MetadataOp> batch = strategy.nextBatch(operationQueue); + assertEquals(batch.size(), 4); + assertTrue(operationQueue.isEmpty()); + } + + @Test + public void testNullOperationInQueue() { + // Test the case where peek() returns null (queue becomes empty during processing) + MetadataOp mockOp = createMockGetOperation("/test/path1"); + @SuppressWarnings("unchecked") + MpscUnboundedArrayQueue<MetadataOp> mockQueue = Mockito.mock(MpscUnboundedArrayQueue.class); + Mockito.when(mockQueue.isEmpty()).thenReturn(false, false, true); + Mockito.when(mockQueue.peek()).thenReturn(mockOp, (MetadataOp) null); + Mockito.when(mockQueue.poll()).thenReturn(mockOp); + List<MetadataOp> batch = strategy.nextBatch(mockQueue); + assertEquals(batch.size(), 1); + } + + @Test + public void testZKResponseHeaderCalculation() { + // Test that ZK_RESPONSE_HEADER_LEN is correctly used in calculations + assertEquals(ZKMetadataStoreBatchStrategy.ZK_RESPONSE_HEADER_LEN, 88); + // Add a GET operation and verify header is included in size calculation + operationQueue.offer(createMockGetOperation(TEST_PATH)); + // The actual size calculation is internal, but we can verify the operation is processed + List<MetadataOp> batch = strategy.nextBatch(operationQueue); + assertEquals(batch.size(), 1); + } + + @Test + public void testDefaultSizeFallbackForGetOperations() { + // Test that defaultSize is used when node size stats return negative values + // defaultSize = Math.max(maxPutSize >>> 4, 1024) = Math.max(1MB >>> 4, 1024) = Math.max(65536, 1024) = 65536 + Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString())) + .thenReturn(-1); // Negative value should trigger defaultSize fallback + operationQueue.offer(createMockGetOperation("/test/path1")); + operationQueue.offer(createMockGetOperation("/test/path2")); + List<MetadataOp> batch = strategy.nextBatch(operationQueue); + // Should process operations using defaultSize for size estimation + assertEquals(batch.size(), 2); + assertTrue(operationQueue.isEmpty()); + } + + @Test + public void testDefaultSizeFallbackForGetChildrenOperations() { + // Test defaultSize fallback for GET_CHILDREN when childrenCount < 0 + Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(Mockito.anyString())) + .thenReturn(-1); // Negative value should trigger defaultSize fallback + operationQueue.offer(createMockGetChildrenOperation("/test/path1")); + operationQueue.offer(createMockGetChildrenOperation("/test/path2")); + List<MetadataOp> batch = strategy.nextBatch(operationQueue); + // Should process operations using defaultSize for size estimation + assertEquals(batch.size(), 2); + assertTrue(operationQueue.isEmpty()); + } + + @Test + public void testDefaultSizeFallbackForGetChildrenWithValidCountButInvalidSize() { + // Test defaultSize fallback when childrenCount >= 0 but size <= 0 + Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(Mockito.anyString())) + .thenReturn(5); // Valid children count + Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString())) + .thenReturn(-1); // Invalid size should trigger defaultSize fallback + operationQueue.offer(createMockGetChildrenOperation("/test/path")); + List<MetadataOp> batch = strategy.nextBatch(operationQueue); + assertEquals(batch.size(), 1); + assertTrue(operationQueue.isEmpty()); + } + + @Test + public void testDefaultOperationType() { + // The implementation doesn't handle null types, so let's test with a valid operation + // that would use the default case in the switch statement + MetadataOp putOp = createMockPutOperation(TEST_PATH, 50); + operationQueue.offer(putOp); + List<MetadataOp> batch = strategy.nextBatch(operationQueue); + assertEquals(batch.size(), 1); + assertEquals(batch.get(0), putOp); + } + + @Test + public void testLargePathNames() { + // Test with very long path names + String longPath = "/very/long/path/name/that/exceeds/normal/length/" + + "and/continues/for/a/very/long/time/to/test/path/handling/" + + "in/the/batch/strategy/implementation"; + operationQueue.offer(createMockPutOperation(longPath, 100)); + operationQueue.offer(createMockDeleteOperation(longPath)); + List<MetadataOp> batch = strategy.nextBatch(operationQueue); + assertEquals(batch.size(), 2); + assertTrue(operationQueue.isEmpty()); + } + + @Test + public void testZeroSizeOperations() { + // Test operations with zero size + operationQueue.offer(createMockPutOperation(TEST_PATH, 0)); + operationQueue.offer(createMockGetOperation(TEST_PATH)); + List<MetadataOp> batch = strategy.nextBatch(operationQueue); + assertEquals(batch.size(), 2); + assertTrue(operationQueue.isEmpty()); + } + + @Test + public void testNodeSizeStatsIntegration() { + String path1 = "/test/path1"; + String path2 = "/test/path2"; + + // Setup different sizes for different paths + Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(path1)).thenReturn(1000); + Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(path2)).thenReturn(2000); + Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(path1)).thenReturn(50); + Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(path2)).thenReturn(100); + + operationQueue.offer(createMockGetOperation(path1)); + operationQueue.offer(createMockGetChildrenOperation(path2)); + + List<MetadataOp> batch = strategy.nextBatch(operationQueue); + + assertEquals(batch.size(), 2); + + // Verify that the node size stats were consulted + Mockito.verify(mockNodeSizeStats).getMaxSizeOfSameResourceType(path1); + Mockito.verify(mockNodeSizeStats).getMaxChildrenCountOfSameResourceType(path2); + } + + @Test + public void testBatchSizeCalculationAccuracy() { + // Test that batch size calculation prevents oversized batches + + // Setup a scenario where the second operation would exceed the limit + Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString())) + .thenReturn(DEFAULT_MAX_SIZE / 3); // Each operation uses 1/3 of max size + + operationQueue.offer(createMockGetOperation("/test/path1")); + operationQueue.offer(createMockGetOperation("/test/path2")); + operationQueue.offer(createMockGetOperation("/test/path3")); // This should not fit + + List<MetadataOp> batch = strategy.nextBatch(operationQueue); + + // Should include first two operations but not the third + assertTrue(batch.size() <= 2); + assertTrue(batch.size() >= 1); + } + + @Test + public void testMixedValidAndInvalidNodeSizeStats() { + // Test mixing operations with valid and invalid node size stats + String validPath = "/valid/path"; + String invalidPath = "/invalid/path"; + + // Setup different responses for different paths + Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(validPath)).thenReturn(500); + Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(invalidPath)).thenReturn(-1); + Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(validPath)).thenReturn(10); + Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(invalidPath)).thenReturn(-1); + + operationQueue.offer(createMockGetOperation(validPath)); + operationQueue.offer(createMockGetOperation(invalidPath)); // Should use defaultSize + operationQueue.offer(createMockGetChildrenOperation(validPath)); + operationQueue.offer(createMockGetChildrenOperation(invalidPath)); // Should use defaultSize + + List<MetadataOp> batch = strategy.nextBatch(operationQueue); + + assertEquals(batch.size(), 4); + assertTrue(operationQueue.isEmpty()); + } + + @Test + public void testZeroSizeFromNodeStats() { + // Test handling of zero size from node stats (should use defaultSize) + Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString())) + .thenReturn(0); // Zero size should trigger defaultSize fallback + Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(Mockito.anyString())) + .thenReturn(5); // Valid children count + + operationQueue.offer(createMockGetOperation("/test/get")); + operationQueue.offer(createMockGetChildrenOperation("/test/children")); + + List<MetadataOp> batch = strategy.nextBatch(operationQueue); + + assertEquals(batch.size(), 2); + assertTrue(operationQueue.isEmpty()); + } + + @Test + public void testDynamicDefaultSizeCalculation() { + // Test that defaultSize is calculated dynamically based on maxPutSize + // defaultSize = Math.max(maxPutSize >>> 4, 1024) + + // Test with default configuration (1MB) + // defaultSize = Math.max(1048576 >>> 4, 1024) = Math.max(65536, 1024) = 65536 + assertEquals(strategy.maxSize(), DEFAULT_MAX_SIZE); // Verify maxPutSize + + // Test with smaller maxPutSize + int smallMaxSize = 8192; // 8KB + Mockito.when(mockClientConfig.getInt(Mockito.anyString(), Mockito.anyInt())) + .thenReturn(smallMaxSize); + + ZKMetadataStoreBatchStrategy smallStrategy = new ZKMetadataStoreBatchStrategy( + mockNodeSizeStats, DEFAULT_MAX_OPERATIONS, DEFAULT_MAX_SIZE, mockZooKeeper); + + assertEquals(smallStrategy.maxSize(), smallMaxSize); + + // For small maxPutSize, defaultSize should be Math.max(8192 >>> 4, 1024) = Math.max(512, 1024) = 1024 + // We can't directly access defaultSize, but we can test its behavior + Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString())) + .thenReturn(-1); // Force defaultSize usage + + MpscUnboundedArrayQueue<MetadataOp> smallQueue = new MpscUnboundedArrayQueue<>(1000); + smallQueue.offer(createMockGetOperation("/test/path")); + + List<MetadataOp> batch = smallStrategy.nextBatch(smallQueue); + assertEquals(batch.size(), 1); + } + + @Test + public void testDynamicDefaultSizeWithLargeMaxPutSize() { + // Test with very large maxPutSize + int largeMaxSize = 16 * 1024 * 1024; // 16MB + Mockito.when(mockClientConfig.getInt(Mockito.anyString(), Mockito.anyInt())) + .thenReturn(largeMaxSize); + + ZKMetadataStoreBatchStrategy largeStrategy = new ZKMetadataStoreBatchStrategy( + mockNodeSizeStats, DEFAULT_MAX_OPERATIONS, DEFAULT_MAX_SIZE, mockZooKeeper); + + assertEquals(largeStrategy.maxSize(), largeMaxSize); + + // For large maxPutSize, defaultSize should be Math.max(16MB >>> 4, 1024) = Math.max(1MB, 1024) = 1MB + // This allows for larger batches when size stats are unavailable + Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString())) + .thenReturn(-1); // Force defaultSize usage + + MpscUnboundedArrayQueue<MetadataOp> largeQueue = new MpscUnboundedArrayQueue<>(1000); + + // Add multiple operations - with larger defaultSize, more should fit + for (int i = 0; i < 20; i++) { + largeQueue.offer(createMockGetOperation("/test/path/" + i)); + } + + List<MetadataOp> batch = largeStrategy.nextBatch(largeQueue); + assertTrue(batch.size() > 0); + assertTrue(batch.size() <= 20); + } + + @Test + public void testDefaultSizeBehavior() { + // Test that the defaultSize field is used correctly for size calculations + // We can't directly access the private field, but we can test its behavior + + // When node stats return negative values, defaultSize should be used + Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString())) + .thenReturn(-1); + + // Add many GET operations to test if defaultSize is being used for calculations + for (int i = 0; i < 100; i++) { + operationQueue.offer(createMockGetOperation("/test/path/" + i)); + } + + List<MetadataOp> batch = strategy.nextBatch(operationQueue); + + // Should be able to process some operations (exact number depends on defaultSize calculation) + assertTrue(batch.size() >= 7); + assertTrue(batch.size() <= 8); + } + + @Test + public void testGetChildrenWithZeroChildrenCount() { + // Test GET_CHILDREN with zero children count (valid case) + Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(Mockito.anyString())) + .thenReturn(0); // Zero children is valid + Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString())) + .thenReturn(100); // Valid size + + operationQueue.offer(createMockGetChildrenOperation("/empty/directory")); + + List<MetadataOp> batch = strategy.nextBatch(operationQueue); + + assertEquals(batch.size(), 1); + assertTrue(operationQueue.isEmpty()); + } + + @Test + public void testEnhancedSizeCalculationLogic() { + // Test the enhanced size calculation logic with various scenarios + + // Scenario 1: Valid stats + String path1 = "/valid/stats"; + Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(path1)).thenReturn(200); + Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(path1)).thenReturn(5); + + // Scenario 2: Invalid children count, valid size + String path2 = "/invalid/children"; + Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(path2)).thenReturn(300); + Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(path2)).thenReturn(-1); + + // Scenario 3: Valid children count, invalid size + String path3 = "/invalid/size"; + Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(path3)).thenReturn(-1); + Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(path3)).thenReturn(3); + + operationQueue.offer(createMockGetOperation(path1)); + operationQueue.offer(createMockGetChildrenOperation(path1)); + operationQueue.offer(createMockGetChildrenOperation(path2)); // Should use defaultSize + operationQueue.offer(createMockGetChildrenOperation(path3)); // Should use defaultSize for size + operationQueue.offer(createMockGetOperation(path2)); + operationQueue.offer(createMockGetOperation(path3)); // Should use defaultSize + + List<MetadataOp> batch = strategy.nextBatch(operationQueue); + + // All operations should be processed + assertEquals(batch.size(), 6); + assertTrue(operationQueue.isEmpty()); + } + + @Test + public void testDefaultSizeCalculationFormula() { + // Test the specific formula: defaultSize = Math.max(maxPutSize >>> 4, 1024) + + // Test case 1: maxPutSize = 1MB (default), defaultSize should be Math.max(65536, 1024) = 65536 + int maxSize1 = 1024 * 1024; // 1MB + Mockito.when(mockClientConfig.getInt(Mockito.anyString(), Mockito.anyInt())) + .thenReturn(maxSize1); + + ZKMetadataStoreBatchStrategy strategy1 = new ZKMetadataStoreBatchStrategy( + mockNodeSizeStats, DEFAULT_MAX_OPERATIONS, DEFAULT_MAX_SIZE, mockZooKeeper); + + // Test case 2: maxPutSize = 8KB, defaultSize should be Math.max(512, 1024) = 1024 + int maxSize2 = 8 * 1024; // 8KB + Mockito.when(mockClientConfig.getInt(Mockito.anyString(), Mockito.anyInt())) + .thenReturn(maxSize2); + + ZKMetadataStoreBatchStrategy strategy2 = new ZKMetadataStoreBatchStrategy( + mockNodeSizeStats, DEFAULT_MAX_OPERATIONS, DEFAULT_MAX_SIZE, mockZooKeeper); + + // Test case 3: maxPutSize = 32MB, defaultSize should be Math.max(2MB, 1024) = 2MB + int maxSize3 = 32 * 1024 * 1024; // 32MB + Mockito.when(mockClientConfig.getInt(Mockito.anyString(), Mockito.anyInt())) + .thenReturn(maxSize3); + + ZKMetadataStoreBatchStrategy strategy3 = new ZKMetadataStoreBatchStrategy( + mockNodeSizeStats, DEFAULT_MAX_OPERATIONS, DEFAULT_MAX_SIZE, mockZooKeeper); + + // Verify maxSize() returns the configured values + assertEquals(strategy1.maxSize(), maxSize1); + assertEquals(strategy2.maxSize(), maxSize2); + assertEquals(strategy3.maxSize(), maxSize3); + + // Test behavior with invalid node stats to verify defaultSize usage + Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString())) + .thenReturn(-1); + + // All strategies should be able to process operations using their respective defaultSize + MpscUnboundedArrayQueue<MetadataOp> queue1 = new MpscUnboundedArrayQueue<>(10); + MpscUnboundedArrayQueue<MetadataOp> queue2 = new MpscUnboundedArrayQueue<>(10); + MpscUnboundedArrayQueue<MetadataOp> queue3 = new MpscUnboundedArrayQueue<>(10); + + queue1.offer(createMockGetOperation("/test1")); + queue2.offer(createMockGetOperation("/test2")); + queue3.offer(createMockGetOperation("/test3")); + + List<MetadataOp> batch1 = strategy1.nextBatch(queue1); + List<MetadataOp> batch2 = strategy2.nextBatch(queue2); + List<MetadataOp> batch3 = strategy3.nextBatch(queue3); + + assertEquals(batch1.size(), 1); + assertEquals(batch2.size(), 1); + assertEquals(batch3.size(), 1); + } + + @Test + public void testDefaultSizeMinimumValue() { + // Test that defaultSize never goes below 1024 bytes + int verySmallMaxSize = 1024; // 1KB + Mockito.when(mockClientConfig.getInt(Mockito.anyString(), Mockito.anyInt())) + .thenReturn(verySmallMaxSize); + + ZKMetadataStoreBatchStrategy smallStrategy = new ZKMetadataStoreBatchStrategy( + mockNodeSizeStats, DEFAULT_MAX_OPERATIONS, DEFAULT_MAX_SIZE, mockZooKeeper); + + // defaultSize should be Math.max(1024 >>> 4, 1024) = Math.max(64, 1024) = 1024 + assertEquals(smallStrategy.maxSize(), verySmallMaxSize); + + // Test that operations can still be processed even with minimum defaultSize + Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString())) + .thenReturn(-1); + + MpscUnboundedArrayQueue<MetadataOp> smallQueue = new MpscUnboundedArrayQueue<>(10); + smallQueue.offer(createMockGetOperation("/test/small")); + + List<MetadataOp> batch = smallStrategy.nextBatch(smallQueue); + assertEquals(batch.size(), 1); + } +} diff --git a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java index 2682f038df2..c8f37e9b3fc 100644 --- a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java +++ b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java @@ -53,6 +53,7 @@ import org.apache.zookeeper.AsyncCallback.StringCallback; import org.apache.zookeeper.AsyncCallback.VoidCallback; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.client.ZKClientConfig; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.proto.DeleteRequest; @@ -138,6 +139,7 @@ public class MockZooKeeper extends ZooKeeper { private int referenceCount; private List<AutoCloseable> closeables; private int sessionTimeout; + private ZKClientConfig zKClientConfig = new ZKClientConfig(); //see details of Objenesis caching - http://objenesis.org/details.html //see supported jvms - https://github.com/easymock/objenesis/blob/master/SupportedJVMs.md @@ -190,6 +192,7 @@ public class MockZooKeeper extends ZooKeeper { zk.sequentialIdGenerator = new AtomicLong(); zk.closeables = new ArrayList<>(); zk.sessionTimeout = 30_000; + zk.zKClientConfig = new ZKClientConfig(); return zk; } @@ -236,6 +239,11 @@ public class MockZooKeeper extends ZooKeeper { return runInExecutorReturningValue(() -> internalCreate(path, data, createMode)); } + @Override + public ZKClientConfig getClientConfig() { + return zKClientConfig; + } + private <T> T runInExecutorReturningValue(Callable<T> task) throws InterruptedException, KeeperException { if (isStopped()) { diff --git a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java index 766f70979aa..0da88c03c2e 100644 --- a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java +++ b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java @@ -27,6 +27,7 @@ import org.apache.zookeeper.AsyncCallback.ChildrenCallback; import org.apache.zookeeper.AsyncCallback.DataCallback; import org.apache.zookeeper.AsyncCallback.StatCallback; import org.apache.zookeeper.AsyncCallback.VoidCallback; +import org.apache.zookeeper.client.ZKClientConfig; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; import org.objenesis.Objenesis; @@ -50,6 +51,8 @@ public class MockZooKeeperSession extends ZooKeeper { private int sessionTimeout = -1; + private ZKClientConfig zkClientConfig = new ZKClientConfig(); + public static MockZooKeeperSession newInstance(MockZooKeeper mockZooKeeper) { return newInstance(mockZooKeeper, true); } @@ -61,6 +64,7 @@ public class MockZooKeeperSession extends ZooKeeper { mockZooKeeperSession.mockZooKeeper = mockZooKeeper; mockZooKeeperSession.sessionId = sessionIdGenerator.getAndIncrement(); mockZooKeeperSession.closeMockZooKeeperOnClose = closeMockZooKeeperOnClose; + mockZooKeeperSession.zkClientConfig = new ZKClientConfig(); if (closeMockZooKeeperOnClose) { mockZooKeeper.increaseRefCount(); } @@ -74,6 +78,11 @@ public class MockZooKeeperSession extends ZooKeeper { assert false; } + @Override + public ZKClientConfig getClientConfig() { + return zkClientConfig; + } + @Override public int getSessionTimeout() { if (sessionTimeout > 0) {