This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 63a37624374fa625d08b02d32ac5f3780db030be Author: Dave Maughan <[email protected]> AuthorDate: Fri Jul 1 04:05:58 2022 +0100 [improve][broker] Optimise msgOutCounter and bytesOutCounter (#16214) (#16286) AbstractTopic#getBytesOutCounter and AbstractTopic#getMsgOutCounter both generate full stats only to pick the single required counter for the getters. These can be optimised to perform only the necessary work to calculate the counters. Provided a scoped implementation of the above methods for the single counter required for each getter. (cherry picked from commit 07c46fe9ef42a72de18be3267e70a51bbc883678) --- .../broker/service/AbstractSubscription.java | 42 ++++++++++++ .../pulsar/broker/service/AbstractTopic.java | 17 ++++- .../org/apache/pulsar/broker/service/Consumer.java | 8 +++ .../nonpersistent/NonPersistentSubscription.java | 6 +- .../service/nonpersistent/NonPersistentTopic.java | 4 -- .../service/persistent/PersistentSubscription.java | 7 +- .../broker/service/persistent/PersistentTopic.java | 4 -- .../broker/service/AbstractSubscriptionTest.java | 58 ++++++++++++++++ .../pulsar/broker/service/AbstractTopicTest.java | 75 +++++++++++++++++++++ .../apache/pulsar/broker/service/ConsumerTest.java | 77 ++++++++++++++++++++++ 10 files changed, 279 insertions(+), 19 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractSubscription.java new file mode 100644 index 00000000000..6a386670556 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractSubscription.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import java.util.Optional; +import java.util.concurrent.atomic.LongAdder; +import java.util.function.ToLongFunction; + +public abstract class AbstractSubscription implements Subscription { + protected final LongAdder bytesOutFromRemovedConsumers = new LongAdder(); + protected final LongAdder msgOutFromRemovedConsumer = new LongAdder(); + + public long getMsgOutCounter() { + return msgOutFromRemovedConsumer.longValue() + sumConsumers(Consumer::getMsgOutCounter); + } + + public long getBytesOutCounter() { + return bytesOutFromRemovedConsumers.longValue() + sumConsumers(Consumer::getBytesOutCounter); + } + + private long sumConsumers(ToLongFunction<Consumer> toCounter) { + return Optional.ofNullable(getDispatcher()) + .map(dispatcher -> dispatcher.getConsumers().stream().mapToLong(toCounter).sum()) + .orElse(0L); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 286eccb8bd6..917dec0b423 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.ToLongFunction; import lombok.Getter; import org.apache.bookkeeper.mledger.util.StatsBuckets; import org.apache.commons.collections4.CollectionUtils; @@ -138,6 +139,9 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP AtomicLongFieldUpdater.newUpdater(AbstractTopic.class, "usageCount"); private volatile long usageCount = 0; + protected final LongAdder msgOutFromRemovedSubscriptions = new LongAdder(); + protected final LongAdder bytesOutFromRemovedSubscriptions = new LongAdder(); + public AbstractTopic(String topic, BrokerService brokerService) { this.topic = topic; this.brokerService = brokerService; @@ -1041,11 +1045,20 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP } public long getMsgOutCounter() { - return getStats(false, false, false).msgOutCounter; + return msgOutFromRemovedSubscriptions.longValue() + + sumSubscriptions(AbstractSubscription::getMsgOutCounter); } public long getBytesOutCounter() { - return getStats(false, false, false).bytesOutCounter; + return bytesOutFromRemovedSubscriptions.longValue() + + sumSubscriptions(AbstractSubscription::getBytesOutCounter); + } + + private long sumSubscriptions(ToLongFunction<AbstractSubscription> toCounter) { + return getSubscriptions().values().stream() + .map(AbstractSubscription.class::cast) + .mapToLong(toCounter) + .sum(); } public boolean isDeleteWhileInactive() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 627c06b3976..bf5b4085561 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -801,6 +801,14 @@ public class Consumer { return stats; } + public long getMsgOutCounter() { + return msgOutCounter.longValue(); + } + + public long getBytesOutCounter() { + return bytesOutCounter.longValue(); + } + public int getUnackedMessages() { return unackedMessages; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java index fed3b0f851a..c3399bda4bd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java @@ -24,11 +24,11 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.LongAdder; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.intercept.BrokerInterceptor; +import org.apache.pulsar.broker.service.AbstractSubscription; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException; @@ -48,7 +48,7 @@ import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class NonPersistentSubscription implements Subscription { +public class NonPersistentSubscription extends AbstractSubscription implements Subscription { private final NonPersistentTopic topic; private volatile NonPersistentDispatcher dispatcher; private final String topicName; @@ -66,8 +66,6 @@ public class NonPersistentSubscription implements Subscription { // Timestamp of when this subscription was last seen active private volatile long lastActive; - private final LongAdder bytesOutFromRemovedConsumers = new LongAdder(); - private final LongAdder msgOutFromRemovedConsumer = new LongAdder(); private volatile Map<String, String> subscriptionProperties; // If isDurable is false(such as a Reader), remove subscription from topic when closing this subscription. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index fb6da69eb2f..bb62cf0f39f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -38,7 +38,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLongFieldUpdater; -import java.util.concurrent.atomic.LongAdder; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.broker.PulsarServerException; @@ -107,9 +106,6 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol AtomicLongFieldUpdater.newUpdater(NonPersistentTopic.class, "entriesAddedCounter"); private volatile long entriesAddedCounter = 0; - private final LongAdder bytesOutFromRemovedSubscriptions = new LongAdder(); - private final LongAdder msgOutFromRemovedSubscriptions = new LongAdder(); - private static final FastThreadLocal<TopicStats> threadLocalTopicStats = new FastThreadLocal<TopicStats>() { @Override protected TopicStats initialValue() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 31ba5115ef6..cab555bb9cd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -30,7 +30,6 @@ import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.LongAdder; import java.util.stream.Collectors; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback; @@ -52,6 +51,7 @@ import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.intercept.BrokerInterceptor; +import org.apache.pulsar.broker.service.AbstractSubscription; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException; import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; @@ -81,7 +81,7 @@ import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class PersistentSubscription implements Subscription { +public class PersistentSubscription extends AbstractSubscription implements Subscription { protected final PersistentTopic topic; protected final ManagedCursor cursor; protected volatile Dispatcher dispatcher; @@ -115,9 +115,6 @@ public class PersistentSubscription implements Subscription { private final PendingAckHandle pendingAckHandle; private volatile Map<String, String> subscriptionProperties; - private final LongAdder bytesOutFromRemovedConsumers = new LongAdder(); - private final LongAdder msgOutFromRemovedConsumer = new LongAdder(); - static { REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 268ad1f0b8b..be5d6095e01 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -46,7 +46,6 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.LongAdder; import java.util.function.BiFunction; import java.util.stream.Collectors; import lombok.Getter; @@ -220,9 +219,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal @Getter protected final TransactionBuffer transactionBuffer; - private final LongAdder bytesOutFromRemovedSubscriptions = new LongAdder(); - private final LongAdder msgOutFromRemovedSubscriptions = new LongAdder(); - // Record the last time a data message (ie: not an internal Pulsar marker) is published on the topic private long lastDataMessagePublishedTimestamp = 0; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractSubscriptionTest.java new file mode 100644 index 00000000000..fbc2ecf8059 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractSubscriptionTest.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; + +import java.util.Collections; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class AbstractSubscriptionTest { + private Consumer consumer; + private AbstractSubscription subscription; + + @BeforeMethod + public void beforeMethod() { + Dispatcher dispatcher = mock(Dispatcher.class); + consumer = mock(Consumer.class); + subscription = spy(AbstractSubscription.class); + + when(subscription.getDispatcher()).thenReturn(dispatcher); + when(dispatcher.getConsumers()).thenReturn(Collections.singletonList(consumer)); + } + + @Test + public void testGetMsgOutCounter() { + subscription.msgOutFromRemovedConsumer.add(1L); + when(consumer.getMsgOutCounter()).thenReturn(2L); + assertEquals(subscription.getMsgOutCounter(), 3L); + } + + @Test + public void testGetBytesOutCounter() { + subscription.bytesOutFromRemovedConsumers.add(1L); + when(consumer.getBytesOutCounter()).thenReturn(2L); + assertEquals(subscription.getBytesOutCounter(), 3L); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractTopicTest.java new file mode 100644 index 00000000000..fb7890dc57f --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractTopicTest.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static org.mockito.Mockito.CALLS_REAL_METHODS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.withSettings; +import static org.testng.Assert.assertEquals; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class AbstractTopicTest { + private AbstractSubscription subscription; + private AbstractTopic topic; + + @BeforeMethod + public void beforeMethod() { + BrokerService brokerService = mock(BrokerService.class); + PulsarService pulsarService = mock(PulsarService.class); + ServiceConfiguration serviceConfiguration = mock(ServiceConfiguration.class); + BacklogQuotaManager backlogQuotaManager = mock(BacklogQuotaManager.class); + subscription = mock(AbstractSubscription.class); + + when(brokerService.pulsar()).thenReturn(pulsarService); + when(pulsarService.getConfiguration()).thenReturn(serviceConfiguration); + when(brokerService.getBacklogQuotaManager()).thenReturn(backlogQuotaManager); + + topic = mock(AbstractTopic.class, withSettings() + .useConstructor("topic", brokerService) + .defaultAnswer(CALLS_REAL_METHODS)); + + ConcurrentOpenHashMap<String, Subscription> subscriptions = + ConcurrentOpenHashMap.<String, Subscription>newBuilder() + .expectedItems(16) + .concurrencyLevel(1) + .build(); + subscriptions.put("subscription", subscription); + when(topic.getSubscriptions()).thenAnswer(invocation -> subscriptions); + } + + @Test + public void testGetMsgOutCounter() { + topic.msgOutFromRemovedSubscriptions.add(1L); + when(subscription.getMsgOutCounter()).thenReturn(2L); + assertEquals(topic.getMsgOutCounter(), 3L); + } + + @Test + public void testGetBytesOutCounter() { + topic.bytesOutFromRemovedSubscriptions.add(1L); + when(subscription.getBytesOutCounter()).thenReturn(2L); + assertEquals(topic.getBytesOutCounter(), 3L); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerTest.java new file mode 100644 index 00000000000..8d36f55ae49 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerTest.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static java.util.Collections.emptyMap; +import static org.apache.pulsar.client.api.MessageId.latest; +import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Exclusive; +import static org.apache.pulsar.common.api.proto.KeySharedMode.AUTO_SPLIT; +import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import java.net.SocketAddress; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.common.api.proto.CommandSubscribe; +import org.apache.pulsar.common.api.proto.KeySharedMeta; +import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class ConsumerTest { + private Consumer consumer; + private final ConsumerStatsImpl stats = new ConsumerStatsImpl(); + + @BeforeMethod + public void beforeMethod() { + Subscription subscription = mock(Subscription.class); + ServerCnx cnx = mock(ServerCnx.class); + SocketAddress address = mock(SocketAddress.class); + Topic topic = mock(Topic.class); + BrokerService brokerService = mock(BrokerService.class); + PulsarService pulsarService = mock(PulsarService.class); + ServiceConfiguration serviceConfiguration = mock(ServiceConfiguration.class); + + when(cnx.clientAddress()).thenReturn(address); + when(subscription.getTopic()).thenReturn(topic); + when(topic.getBrokerService()).thenReturn(brokerService); + when(brokerService.getPulsar()).thenReturn(pulsarService); + when(pulsarService.getConfiguration()).thenReturn(serviceConfiguration); + + consumer = + new Consumer(subscription, Exclusive, "topic", 1, 0, "Cons1", true, cnx, "myrole-1", emptyMap(), false, + CommandSubscribe.InitialPosition.Earliest, new KeySharedMeta().setKeySharedMode(AUTO_SPLIT), latest, DEFAULT_CONSUMER_EPOCH); + } + + @Test + public void testGetMsgOutCounter() { + stats.msgOutCounter = 1L; + consumer.updateStats(stats); + assertEquals(consumer.getMsgOutCounter(), 1L); + } + + @Test + public void testGetBytesOutCounter() { + stats.bytesOutCounter = 1L; + consumer.updateStats(stats); + assertEquals(consumer.getBytesOutCounter(), 1L); + } +}
