This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 07c46fe9ef4 [improve][broker] Optimise msgOutCounter and 
bytesOutCounter (#16214) (#16286)
07c46fe9ef4 is described below

commit 07c46fe9ef42a72de18be3267e70a51bbc883678
Author: Dave Maughan <[email protected]>
AuthorDate: Fri Jul 1 04:05:58 2022 +0100

    [improve][broker] Optimise msgOutCounter and bytesOutCounter (#16214) 
(#16286)
    
    ### Motivation
    
    AbstractTopic#getBytesOutCounter and AbstractTopic#getMsgOutCounter
    both generate full stats only to pick the single required counter for
    the getters. These can be optimised to perform only the necessary work
    to calculate the counters.
    
    ### Modification
    
    Provided a scoped implementation of the above methods for the single
    counter required for each getter.
---
 .../broker/service/AbstractSubscription.java       | 42 ++++++++++++
 .../pulsar/broker/service/AbstractTopic.java       | 17 ++++-
 .../org/apache/pulsar/broker/service/Consumer.java |  8 +++
 .../nonpersistent/NonPersistentSubscription.java   |  6 +-
 .../service/nonpersistent/NonPersistentTopic.java  |  4 --
 .../service/persistent/PersistentSubscription.java |  7 +-
 .../broker/service/persistent/PersistentTopic.java |  4 --
 .../broker/service/AbstractSubscriptionTest.java   | 57 ++++++++++++++++
 .../pulsar/broker/service/AbstractTopicTest.java   | 75 +++++++++++++++++++++
 .../apache/pulsar/broker/service/ConsumerTest.java | 76 ++++++++++++++++++++++
 10 files changed, 277 insertions(+), 19 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractSubscription.java
new file mode 100644
index 00000000000..6a386670556
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractSubscription.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.ToLongFunction;
+
+public abstract class AbstractSubscription implements Subscription {
+    protected final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
+    protected final LongAdder msgOutFromRemovedConsumer = new LongAdder();
+
+    public long getMsgOutCounter() {
+        return msgOutFromRemovedConsumer.longValue() + 
sumConsumers(Consumer::getMsgOutCounter);
+    }
+
+    public long getBytesOutCounter() {
+        return bytesOutFromRemovedConsumers.longValue() + 
sumConsumers(Consumer::getBytesOutCounter);
+    }
+
+    private long sumConsumers(ToLongFunction<Consumer> toCounter) {
+        return Optional.ofNullable(getDispatcher())
+                .map(dispatcher -> 
dispatcher.getConsumers().stream().mapToLong(toCounter).sum())
+                .orElse(0L);
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 19116d60566..acf68bd2ae8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.ToLongFunction;
 import lombok.Getter;
 import org.apache.bookkeeper.mledger.util.StatsBuckets;
 import org.apache.commons.collections4.CollectionUtils;
@@ -141,6 +142,9 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
 
     private Map<String/*subscription*/, SubscriptionPolicies> 
subscriptionPolicies = Collections.emptyMap();
 
+    protected final LongAdder msgOutFromRemovedSubscriptions = new LongAdder();
+    protected final LongAdder bytesOutFromRemovedSubscriptions = new 
LongAdder();
+
     public AbstractTopic(String topic, BrokerService brokerService) {
         this.topic = topic;
         this.brokerService = brokerService;
@@ -1133,11 +1137,20 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
     }
 
     public long getMsgOutCounter() {
-        return getStats(false, false, false).msgOutCounter;
+        return msgOutFromRemovedSubscriptions.longValue()
+                + sumSubscriptions(AbstractSubscription::getMsgOutCounter);
     }
 
     public long getBytesOutCounter() {
-        return getStats(false, false, false).bytesOutCounter;
+        return bytesOutFromRemovedSubscriptions.longValue()
+                + sumSubscriptions(AbstractSubscription::getBytesOutCounter);
+    }
+
+    private long sumSubscriptions(ToLongFunction<AbstractSubscription> 
toCounter) {
+        return getSubscriptions().values().stream()
+                .map(AbstractSubscription.class::cast)
+                .mapToLong(toCounter)
+                .sum();
     }
 
     public boolean isDeleteWhileInactive() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index bedaabf28d1..8d63a283046 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -798,6 +798,14 @@ public class Consumer {
         return stats;
     }
 
+    public long getMsgOutCounter() {
+        return msgOutCounter.longValue();
+    }
+
+    public long getBytesOutCounter() {
+        return bytesOutCounter.longValue();
+    }
+
     public int getUnackedMessages() {
         return unackedMessages;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index fed3b0f851a..c3399bda4bd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -24,11 +24,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.concurrent.atomic.LongAdder;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+import org.apache.pulsar.broker.service.AbstractSubscription;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
@@ -48,7 +48,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class NonPersistentSubscription implements Subscription {
+public class NonPersistentSubscription extends AbstractSubscription implements 
Subscription {
     private final NonPersistentTopic topic;
     private volatile NonPersistentDispatcher dispatcher;
     private final String topicName;
@@ -66,8 +66,6 @@ public class NonPersistentSubscription implements 
Subscription {
     // Timestamp of when this subscription was last seen active
     private volatile long lastActive;
 
-    private final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
-    private final LongAdder msgOutFromRemovedConsumer = new LongAdder();
     private volatile Map<String, String> subscriptionProperties;
 
     // If isDurable is false(such as a Reader), remove subscription from topic 
when closing this subscription.
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index ebca481c738..a9deda64340 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -38,7 +38,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-import java.util.concurrent.atomic.LongAdder;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.PulsarServerException;
@@ -107,9 +106,6 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
             AtomicLongFieldUpdater.newUpdater(NonPersistentTopic.class, 
"entriesAddedCounter");
     private volatile long entriesAddedCounter = 0;
 
-    private final LongAdder bytesOutFromRemovedSubscriptions = new LongAdder();
-    private final LongAdder msgOutFromRemovedSubscriptions = new LongAdder();
-
     private static final FastThreadLocal<TopicStats> threadLocalTopicStats = 
new FastThreadLocal<TopicStats>() {
         @Override
         protected TopicStats initialValue() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 5c4cd624be3..0dc38f9bc2e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -30,7 +30,6 @@ import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.concurrent.atomic.LongAdder;
 import java.util.stream.Collectors;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
@@ -52,6 +51,7 @@ import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+import org.apache.pulsar.broker.service.AbstractSubscription;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
@@ -82,7 +82,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class PersistentSubscription implements Subscription {
+public class PersistentSubscription extends AbstractSubscription implements 
Subscription {
     protected final PersistentTopic topic;
     protected final ManagedCursor cursor;
     protected volatile Dispatcher dispatcher;
@@ -116,9 +116,6 @@ public class PersistentSubscription implements Subscription 
{
     private final PendingAckHandle pendingAckHandle;
     private volatile Map<String, String> subscriptionProperties;
 
-    private final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
-    private final LongAdder msgOutFromRemovedConsumer = new LongAdder();
-
     static {
         
REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES.put(REPLICATED_SUBSCRIPTION_PROPERTY, 
1L);
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 5457f815b98..9e8604dd28b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -47,7 +47,6 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.LongAdder;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 import lombok.Getter;
@@ -222,9 +221,6 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     @Getter
     protected final TransactionBuffer transactionBuffer;
 
-    private final LongAdder bytesOutFromRemovedSubscriptions = new LongAdder();
-    private final LongAdder msgOutFromRemovedSubscriptions = new LongAdder();
-
     // Record the last time a data message (ie: not an internal Pulsar marker) 
is published on the topic
     private long lastDataMessagePublishedTimestamp = 0;
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractSubscriptionTest.java
new file mode 100644
index 00000000000..aaf1f6164a3
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractSubscriptionTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import java.util.List;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class AbstractSubscriptionTest {
+    private Consumer consumer;
+    private AbstractSubscription subscription;
+
+    @BeforeMethod
+    public void beforeMethod() {
+        Dispatcher dispatcher = mock(Dispatcher.class);
+        consumer = mock(Consumer.class);
+        subscription = spy(AbstractSubscription.class);
+
+        when(subscription.getDispatcher()).thenReturn(dispatcher);
+        when(dispatcher.getConsumers()).thenReturn(List.of(consumer));
+    }
+
+    @Test
+    public void testGetMsgOutCounter() {
+        subscription.msgOutFromRemovedConsumer.add(1L);
+        when(consumer.getMsgOutCounter()).thenReturn(2L);
+        assertEquals(subscription.getMsgOutCounter(), 3L);
+    }
+
+    @Test
+    public void testGetBytesOutCounter() {
+        subscription.bytesOutFromRemovedConsumers.add(1L);
+        when(consumer.getBytesOutCounter()).thenReturn(2L);
+        assertEquals(subscription.getBytesOutCounter(), 3L);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractTopicTest.java
new file mode 100644
index 00000000000..fb7890dc57f
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractTopicTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.withSettings;
+import static org.testng.Assert.assertEquals;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class AbstractTopicTest {
+    private AbstractSubscription subscription;
+    private AbstractTopic topic;
+
+    @BeforeMethod
+    public void beforeMethod() {
+        BrokerService brokerService = mock(BrokerService.class);
+        PulsarService pulsarService = mock(PulsarService.class);
+        ServiceConfiguration serviceConfiguration = 
mock(ServiceConfiguration.class);
+        BacklogQuotaManager backlogQuotaManager = 
mock(BacklogQuotaManager.class);
+        subscription = mock(AbstractSubscription.class);
+
+        when(brokerService.pulsar()).thenReturn(pulsarService);
+        
when(pulsarService.getConfiguration()).thenReturn(serviceConfiguration);
+        
when(brokerService.getBacklogQuotaManager()).thenReturn(backlogQuotaManager);
+
+        topic = mock(AbstractTopic.class, withSettings()
+                .useConstructor("topic", brokerService)
+                .defaultAnswer(CALLS_REAL_METHODS));
+
+        ConcurrentOpenHashMap<String, Subscription> subscriptions =
+                ConcurrentOpenHashMap.<String, Subscription>newBuilder()
+                        .expectedItems(16)
+                        .concurrencyLevel(1)
+                        .build();
+        subscriptions.put("subscription", subscription);
+        when(topic.getSubscriptions()).thenAnswer(invocation -> subscriptions);
+    }
+
+    @Test
+    public void testGetMsgOutCounter() {
+        topic.msgOutFromRemovedSubscriptions.add(1L);
+        when(subscription.getMsgOutCounter()).thenReturn(2L);
+        assertEquals(topic.getMsgOutCounter(), 3L);
+    }
+
+    @Test
+    public void testGetBytesOutCounter() {
+        topic.bytesOutFromRemovedSubscriptions.add(1L);
+        when(subscription.getBytesOutCounter()).thenReturn(2L);
+        assertEquals(topic.getBytesOutCounter(), 3L);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerTest.java
new file mode 100644
index 00000000000..1ad0642d9c5
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerTest.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static java.util.Collections.emptyMap;
+import static org.apache.pulsar.client.api.MessageId.latest;
+import static 
org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Exclusive;
+import static org.apache.pulsar.common.api.proto.KeySharedMode.AUTO_SPLIT;
+import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import java.net.SocketAddress;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.api.proto.KeySharedMeta;
+import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class ConsumerTest {
+    private Consumer consumer;
+    private final ConsumerStatsImpl stats = new ConsumerStatsImpl();
+
+    @BeforeMethod
+    public void beforeMethod() {
+        Subscription subscription = mock(Subscription.class);
+        ServerCnx cnx = mock(ServerCnx.class);
+        SocketAddress address = mock(SocketAddress.class);
+        Topic topic = mock(Topic.class);
+        BrokerService brokerService = mock(BrokerService.class);
+        PulsarService pulsarService = mock(PulsarService.class);
+        ServiceConfiguration serviceConfiguration = 
mock(ServiceConfiguration.class);
+
+        when(cnx.clientAddress()).thenReturn(address);
+        when(subscription.getTopic()).thenReturn(topic);
+        when(topic.getBrokerService()).thenReturn(brokerService);
+        when(brokerService.getPulsar()).thenReturn(pulsarService);
+        
when(pulsarService.getConfiguration()).thenReturn(serviceConfiguration);
+
+        consumer =
+                new Consumer(subscription, Exclusive, "topic", 1, 0, "Cons1", 
true, cnx, "myrole-1", emptyMap(), false,
+                        new KeySharedMeta().setKeySharedMode(AUTO_SPLIT), 
latest, DEFAULT_CONSUMER_EPOCH);
+    }
+
+    @Test
+    public void testGetMsgOutCounter() {
+        stats.msgOutCounter = 1L;
+        consumer.updateStats(stats);
+        assertEquals(consumer.getMsgOutCounter(), 1L);
+    }
+
+    @Test
+    public void testGetBytesOutCounter() {
+        stats.bytesOutCounter = 1L;
+        consumer.updateStats(stats);
+        assertEquals(consumer.getBytesOutCounter(), 1L);
+    }
+}

Reply via email to