This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 41c7dca8c93 [improve][broker] Optimise msgOutCounter and
bytesOutCounter (#16771)
41c7dca8c93 is described below
commit 41c7dca8c938752702f9fe004ef99bb159e468f4
Author: Dave Maughan <[email protected]>
AuthorDate: Mon Aug 8 15:18:50 2022 +0100
[improve][broker] Optimise msgOutCounter and bytesOutCounter (#16771)
---
.../broker/service/AbstractSubscription.java | 42 ++++++++++++
.../pulsar/broker/service/AbstractTopic.java | 17 ++++-
.../org/apache/pulsar/broker/service/Consumer.java | 8 +++
.../nonpersistent/NonPersistentSubscription.java | 7 +-
.../service/nonpersistent/NonPersistentTopic.java | 4 --
.../service/persistent/PersistentSubscription.java | 7 +-
.../broker/service/persistent/PersistentTopic.java | 4 --
.../broker/service/AbstractSubscriptionTest.java | 58 +++++++++++++++++
.../pulsar/broker/service/AbstractTopicTest.java | 75 ++++++++++++++++++++++
.../apache/pulsar/broker/service/ConsumerTest.java | 75 ++++++++++++++++++++++
10 files changed, 277 insertions(+), 20 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractSubscription.java
new file mode 100644
index 00000000000..6a386670556
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractSubscription.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.ToLongFunction;
+
+public abstract class AbstractSubscription implements Subscription {
+ protected final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
+ protected final LongAdder msgOutFromRemovedConsumer = new LongAdder();
+
+ public long getMsgOutCounter() {
+ return msgOutFromRemovedConsumer.longValue() +
sumConsumers(Consumer::getMsgOutCounter);
+ }
+
+ public long getBytesOutCounter() {
+ return bytesOutFromRemovedConsumers.longValue() +
sumConsumers(Consumer::getBytesOutCounter);
+ }
+
+ private long sumConsumers(ToLongFunction<Consumer> toCounter) {
+ return Optional.ofNullable(getDispatcher())
+ .map(dispatcher ->
dispatcher.getConsumers().stream().mapToLong(toCounter).sum())
+ .orElse(0L);
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 69c51c77d44..1307001a720 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.ToLongFunction;
import lombok.Getter;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.commons.lang3.tuple.Pair;
@@ -136,6 +137,9 @@ public abstract class AbstractTopic implements Topic {
private volatile long lastTopicMaxMessageSizeCheckTimeStamp = 0;
private final long topicMaxMessageSizeCheckIntervalMs;
+ protected final LongAdder msgOutFromRemovedSubscriptions = new LongAdder();
+ protected final LongAdder bytesOutFromRemovedSubscriptions = new
LongAdder();
+
public AbstractTopic(String topic, BrokerService brokerService) {
this.topic = topic;
this.brokerService = brokerService;
@@ -875,11 +879,20 @@ public abstract class AbstractTopic implements Topic {
}
public long getMsgOutCounter() {
- return getStats(false, false).msgOutCounter;
+ return msgOutFromRemovedSubscriptions.longValue()
+ + sumSubscriptions(AbstractSubscription::getMsgOutCounter);
}
public long getBytesOutCounter() {
- return getStats(false, false).bytesOutCounter;
+ return bytesOutFromRemovedSubscriptions.longValue()
+ + sumSubscriptions(AbstractSubscription::getBytesOutCounter);
+ }
+
+ private long sumSubscriptions(ToLongFunction<AbstractSubscription>
toCounter) {
+ return getSubscriptions().values().stream()
+ .map(AbstractSubscription.class::cast)
+ .mapToLong(toCounter)
+ .sum();
}
public boolean isDeleteWhileInactive() {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 2661d29110c..52a91c5ef3e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -780,6 +780,14 @@ public class Consumer {
return stats;
}
+ public long getMsgOutCounter() {
+ return msgOutCounter.longValue();
+ }
+
+ public long getBytesOutCounter() {
+ return bytesOutCounter.longValue();
+ }
+
public int getUnackedMessages() {
return unackedMessages;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index cb515cdbd97..b8a7df36af2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -24,11 +24,11 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+import org.apache.pulsar.broker.service.AbstractSubscription;
import org.apache.pulsar.broker.service.BrokerServiceException;
import
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import
org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
@@ -48,7 +48,7 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class NonPersistentSubscription implements Subscription {
+public class NonPersistentSubscription extends AbstractSubscription implements
Subscription {
private final NonPersistentTopic topic;
private volatile NonPersistentDispatcher dispatcher;
private final String topicName;
@@ -66,9 +66,6 @@ public class NonPersistentSubscription implements
Subscription {
// Timestamp of when this subscription was last seen active
private volatile long lastActive;
- private final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
- private final LongAdder msgOutFromRemovedConsumer = new LongAdder();
-
// If isDurable is false(such as a Reader), remove subscription from topic
when closing this subscription.
private final boolean isDurable;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 3846fc129b0..0c2823a131a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -34,7 +34,6 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.PulsarServerException;
@@ -102,9 +101,6 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic {
AtomicLongFieldUpdater.newUpdater(NonPersistentTopic.class,
"entriesAddedCounter");
private volatile long entriesAddedCounter = 0;
- private final LongAdder bytesOutFromRemovedSubscriptions = new LongAdder();
- private final LongAdder msgOutFromRemovedSubscriptions = new LongAdder();
-
private static final FastThreadLocal<TopicStats> threadLocalTopicStats =
new FastThreadLocal<TopicStats>() {
@Override
protected TopicStats initialValue() {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index b119f3ca5d2..93ad2e80a64 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -29,7 +29,6 @@ import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
@@ -50,6 +49,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+import org.apache.pulsar.broker.service.AbstractSubscription;
import org.apache.pulsar.broker.service.BrokerServiceException;
import
org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
@@ -80,7 +80,7 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class PersistentSubscription implements Subscription {
+public class PersistentSubscription extends AbstractSubscription implements
Subscription {
protected final PersistentTopic topic;
protected final ManagedCursor cursor;
protected volatile Dispatcher dispatcher;
@@ -113,9 +113,6 @@ public class PersistentSubscription implements Subscription
{
private volatile ReplicatedSubscriptionSnapshotCache
replicatedSubscriptionSnapshotCache;
private final PendingAckHandle pendingAckHandle;
- private final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
- private final LongAdder msgOutFromRemovedConsumer = new LongAdder();
-
static {
REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES.put(REPLICATED_SUBSCRIPTION_PROPERTY,
1L);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 932def13db9..5f2fd5ea1d9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -44,7 +44,6 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import lombok.Getter;
@@ -222,9 +221,6 @@ public class PersistentTopic extends AbstractTopic
@Getter
protected final TransactionBuffer transactionBuffer;
- private final LongAdder bytesOutFromRemovedSubscriptions = new LongAdder();
- private final LongAdder msgOutFromRemovedSubscriptions = new LongAdder();
-
// Record the last time a data message (ie: not an internal Pulsar marker)
is published on the topic
private long lastDataMessagePublishedTimestamp = 0;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractSubscriptionTest.java
new file mode 100644
index 00000000000..fbc2ecf8059
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractSubscriptionTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+
+import java.util.Collections;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class AbstractSubscriptionTest {
+ private Consumer consumer;
+ private AbstractSubscription subscription;
+
+ @BeforeMethod
+ public void beforeMethod() {
+ Dispatcher dispatcher = mock(Dispatcher.class);
+ consumer = mock(Consumer.class);
+ subscription = spy(AbstractSubscription.class);
+
+ when(subscription.getDispatcher()).thenReturn(dispatcher);
+
when(dispatcher.getConsumers()).thenReturn(Collections.singletonList(consumer));
+ }
+
+ @Test
+ public void testGetMsgOutCounter() {
+ subscription.msgOutFromRemovedConsumer.add(1L);
+ when(consumer.getMsgOutCounter()).thenReturn(2L);
+ assertEquals(subscription.getMsgOutCounter(), 3L);
+ }
+
+ @Test
+ public void testGetBytesOutCounter() {
+ subscription.bytesOutFromRemovedConsumers.add(1L);
+ when(consumer.getBytesOutCounter()).thenReturn(2L);
+ assertEquals(subscription.getBytesOutCounter(), 3L);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractTopicTest.java
new file mode 100644
index 00000000000..fb7890dc57f
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractTopicTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.withSettings;
+import static org.testng.Assert.assertEquals;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class AbstractTopicTest {
+ private AbstractSubscription subscription;
+ private AbstractTopic topic;
+
+ @BeforeMethod
+ public void beforeMethod() {
+ BrokerService brokerService = mock(BrokerService.class);
+ PulsarService pulsarService = mock(PulsarService.class);
+ ServiceConfiguration serviceConfiguration =
mock(ServiceConfiguration.class);
+ BacklogQuotaManager backlogQuotaManager =
mock(BacklogQuotaManager.class);
+ subscription = mock(AbstractSubscription.class);
+
+ when(brokerService.pulsar()).thenReturn(pulsarService);
+
when(pulsarService.getConfiguration()).thenReturn(serviceConfiguration);
+
when(brokerService.getBacklogQuotaManager()).thenReturn(backlogQuotaManager);
+
+ topic = mock(AbstractTopic.class, withSettings()
+ .useConstructor("topic", brokerService)
+ .defaultAnswer(CALLS_REAL_METHODS));
+
+ ConcurrentOpenHashMap<String, Subscription> subscriptions =
+ ConcurrentOpenHashMap.<String, Subscription>newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
+ subscriptions.put("subscription", subscription);
+ when(topic.getSubscriptions()).thenAnswer(invocation -> subscriptions);
+ }
+
+ @Test
+ public void testGetMsgOutCounter() {
+ topic.msgOutFromRemovedSubscriptions.add(1L);
+ when(subscription.getMsgOutCounter()).thenReturn(2L);
+ assertEquals(topic.getMsgOutCounter(), 3L);
+ }
+
+ @Test
+ public void testGetBytesOutCounter() {
+ topic.bytesOutFromRemovedSubscriptions.add(1L);
+ when(subscription.getBytesOutCounter()).thenReturn(2L);
+ assertEquals(topic.getBytesOutCounter(), 3L);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerTest.java
new file mode 100644
index 00000000000..65dc4336a0e
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static java.util.Collections.emptyMap;
+import static org.apache.pulsar.client.api.MessageId.latest;
+import static
org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Exclusive;
+import static org.apache.pulsar.common.api.proto.KeySharedMode.AUTO_SPLIT;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import java.net.SocketAddress;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.api.proto.CommandSubscribe;
+import org.apache.pulsar.common.api.proto.KeySharedMeta;
+import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class ConsumerTest {
+ private Consumer consumer;
+ private final ConsumerStatsImpl stats = new ConsumerStatsImpl();
+
+ @BeforeMethod
+ public void beforeMethod() {
+ Subscription subscription = mock(Subscription.class);
+ ServerCnx cnx = mock(ServerCnx.class);
+ SocketAddress address = mock(SocketAddress.class);
+ Topic topic = mock(Topic.class);
+ BrokerService brokerService = mock(BrokerService.class);
+ PulsarService pulsarService = mock(PulsarService.class);
+ ServiceConfiguration serviceConfiguration =
mock(ServiceConfiguration.class);
+
+ when(cnx.clientAddress()).thenReturn(address);
+ when(subscription.getTopic()).thenReturn(topic);
+ when(topic.getBrokerService()).thenReturn(brokerService);
+ when(brokerService.getPulsar()).thenReturn(pulsarService);
+
when(pulsarService.getConfiguration()).thenReturn(serviceConfiguration);
+
+ consumer = new Consumer(subscription, Exclusive, "topic", 1, 0,
"Cons1", 1, cnx, "myrole-1", emptyMap(), false,
+ CommandSubscribe.InitialPosition.Earliest, new
KeySharedMeta().setKeySharedMode(AUTO_SPLIT), latest);
+ }
+
+ @Test
+ public void testGetMsgOutCounter() {
+ stats.msgOutCounter = 1L;
+ consumer.updateStats(stats);
+ assertEquals(consumer.getMsgOutCounter(), 1L);
+ }
+
+ @Test
+ public void testGetBytesOutCounter() {
+ stats.bytesOutCounter = 1L;
+ consumer.updateStats(stats);
+ assertEquals(consumer.getBytesOutCounter(), 1L);
+ }
+}