This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 26f9ffa497e [improve] [client] Add api to get producer/consumer stats
for partition topic (#18212)
26f9ffa497e is described below
commit 26f9ffa497e16396ea4dbbcad3452b2a973f86ac
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Tue Nov 1 12:57:11 2022 -0700
[improve] [client] Add api to get producer/consumer stats for partition
topic (#18212)
* [improve] [client] Add api to get producer/consumer stats for partition
topic
* introduce partition topic stats interface
---
.../client/api/SimpleProducerConsumerStatTest.java | 61 +++++++++++++++++
.../pulsar/client/api/MultiTopicConsumerStats.java | 39 +++++++++++
.../client/api/PartitionedTopicProducerStats.java | 40 +++++++++++
.../apache/pulsar/client/api/ProducerStats.java | 1 -
.../impl/MultiTopicConsumerStatsRecorderImpl.java | 59 ++++++++++++++++
.../client/impl/MultiTopicsConsumerImpl.java | 6 +-
.../client/impl/PartitionedProducerImpl.java | 9 ++-
.../PartitionedTopicProducerStatsRecorderImpl.java | 79 ++++++++++++++++++++++
.../pulsar/client/impl/ProducerStatsDisabled.java | 1 +
.../client/impl/ProducerStatsRecorderImpl.java | 17 +----
.../client/impl/ProducerStatsRecorderImplTest.java | 4 +-
11 files changed, 292 insertions(+), 24 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
index ad849395045..bae891ad46a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.api;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
@@ -30,6 +31,7 @@ import com.google.gson.JsonObject;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
@@ -449,4 +451,63 @@ public class SimpleProducerConsumerStatTest extends
ProducerConsumerBase {
.until(() -> producer.getStats().getPendingQueueSize() ==
numMessages);
assertEquals(producer.getStats().getPendingQueueSize(), numMessages);
}
+
+ /**
+ * This test verifies partitioned topic stats for producer and consumer.
+ * @throws Exception
+ */
+ @Test
+ public void testPartitionTopicStats() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ String topicName =
"persistent://my-property/my-ns/testPartitionTopicStats";
+ int numPartitions = 10;
+ admin.topics().createPartitionedTopic(topicName, numPartitions);
+
+ ConsumerBuilder<byte[]> consumerBuilder =
pulsarClient.newConsumer().topic(topicName)
+ .subscriptionName("my-subscriber-name");
+
+ Consumer<byte[]> consumer = consumerBuilder.subscribe();
+
+ ProducerBuilder<byte[]> producerBuilder =
pulsarClient.newProducer().enableBatching(false).topic(topicName);
+
+ Producer<byte[]> producer = producerBuilder.create();
+
+ int numMessages = 20;
+ for (int i = 0; i < numMessages; i++) {
+ String message = "my-message-" + i;
+ producer.send(message.getBytes());
+ }
+
+ Message<byte[]> msg = null;
+ Set<String> messageSet = new HashSet<>();
+ for (int i = 0; i < numMessages; i++) {
+ msg = consumer.receive(5, TimeUnit.SECONDS);
+ String receivedMessage = new String(msg.getData());
+ log.info("Received message: [{}]", receivedMessage);
+ String expectedMessage = "my-message-" + i;
+ testMessageOrderAndDuplicates(messageSet, receivedMessage,
expectedMessage);
+ }
+ // Acknowledge the consumption of all messages at once
+ consumer.acknowledgeCumulative(msg);
+
+ MultiTopicConsumerStats cStat = (MultiTopicConsumerStats)
consumer.getStats();
+ PartitionedTopicProducerStats pStat = (PartitionedTopicProducerStats)
producer.getStats();
+ retryStrategically((test) -> !pStat.getPartitionStats().isEmpty(), 5,
100);
+ retryStrategically((test) -> !cStat.getPartitionStats().isEmpty(), 5,
100);
+ Map<String, ProducerStats> prodStatsMap = pStat.getPartitionStats();
+ Map<String, ConsumerStats> consStatsMap = cStat.getPartitionStats();
+ assertFalse(prodStatsMap.isEmpty());
+ assertFalse(consStatsMap.isEmpty());
+ for (int i = 0; i < numPartitions; i++) {
+ String topic = topicName + "-partition-" + i;
+ assertTrue(prodStatsMap.containsKey(topic));
+ assertTrue(consStatsMap.containsKey(topic));
+ }
+
+ consumer.close();
+ producer.close();
+
+ log.info("-- Exiting {} test --", methodName);
+ }
}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MultiTopicConsumerStats.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MultiTopicConsumerStats.java
new file mode 100644
index 00000000000..e1c1d3372c3
--- /dev/null
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MultiTopicConsumerStats.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.util.Map;
+import org.apache.pulsar.common.classification.InterfaceAudience;
+import org.apache.pulsar.common.classification.InterfaceStability;
+
+/**
+ * Multi-topic Consumer statistics recorded by client.
+ *
+ * <p>All the stats are relative to the last recording period. The interval of
the stats refreshes is configured with
+ * {@link ClientBuilder#statsInterval(long, java.util.concurrent.TimeUnit)}
with a default of 1 minute.
+ */
[email protected]
[email protected]
+public interface MultiTopicConsumerStats extends ConsumerStats {
+
+ /**
+ * @return stats for each partition if topic is partitioned topic
+ */
+ Map<String, ConsumerStats> getPartitionStats();
+}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PartitionedTopicProducerStats.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PartitionedTopicProducerStats.java
new file mode 100644
index 00000000000..5fd3c1f34a0
--- /dev/null
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PartitionedTopicProducerStats.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.util.Map;
+import org.apache.pulsar.common.classification.InterfaceAudience;
+import org.apache.pulsar.common.classification.InterfaceStability;
+
+/**
+ * Partitioned topic Producer statistics recorded by client.
+ *
+ * <p>All the stats are relative to the last recording period. The interval of
the stats refreshes is configured with
+ * {@link ClientBuilder#statsInterval(long, java.util.concurrent.TimeUnit)}
with a default of 1 minute.
+ */
[email protected]
[email protected]
+public interface PartitionedTopicProducerStats extends ProducerStats {
+
+ /**
+ * @return stats for each partition if topic is partitioned topic
+ */
+ Map<String, ProducerStats> getPartitionStats();
+
+}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerStats.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerStats.java
index 573d23611b6..fd5b87c6639 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerStats.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerStats.java
@@ -115,5 +115,4 @@ public interface ProducerStats extends Serializable {
* @return current pending send-message queue size of the producer
*/
int getPendingQueueSize();
-
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicConsumerStatsRecorderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicConsumerStatsRecorderImpl.java
new file mode 100644
index 00000000000..17018be02be
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicConsumerStatsRecorderImpl.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerStats;
+import org.apache.pulsar.client.api.MultiTopicConsumerStats;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MultiTopicConsumerStatsRecorderImpl extends
ConsumerStatsRecorderImpl implements MultiTopicConsumerStats {
+
+ private static final long serialVersionUID = 1L;
+ private Map<String, ConsumerStats> partitionStats = new
ConcurrentHashMap<>();
+
+ public MultiTopicConsumerStatsRecorderImpl() {
+ super();
+ }
+
+ public MultiTopicConsumerStatsRecorderImpl(Consumer<?> consumer) {
+ super(consumer);
+ }
+
+ public MultiTopicConsumerStatsRecorderImpl(PulsarClientImpl pulsarClient,
ConsumerConfigurationData<?> conf,
+ Consumer<?> consumer) {
+ super(pulsarClient, conf, consumer);
+ }
+
+ public void updateCumulativeStats(String partition, ConsumerStats stats) {
+ super.updateCumulativeStats(stats);
+ partitionStats.put(partition, stats);
+ }
+
+ @Override
+ public Map<String, ConsumerStats> getPartitionStats() {
+ return partitionStats;
+ }
+
+ private static final Logger log =
LoggerFactory.getLogger(MultiTopicConsumerStatsRecorderImpl.class);
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 6306578008a..65385a32ac2 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -95,7 +95,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
private volatile Timeout partitionsAutoUpdateTimeout = null;
TopicsPartitionChangedListener topicsPartitionChangedListener;
CompletableFuture<Void> partitionsAutoUpdateFuture = null;
- private final ConsumerStatsRecorder stats;
+ private final MultiTopicConsumerStatsRecorderImpl stats;
private UnAckedMessageTracker unAckedMessageTracker;
private final ConsumerConfigurationData<T> internalConfig;
@@ -156,7 +156,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
this.internalConfig = getInternalConsumerConfig();
this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0
- ? new ConsumerStatsRecorderImpl(this)
+ ? new MultiTopicConsumerStatsRecorderImpl(this)
: null;
// start track and auto subscribe partition increment
@@ -826,7 +826,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
}
stats.reset();
- consumers.values().stream().forEach(consumer ->
stats.updateCumulativeStats(consumer.getStats()));
+ consumers.forEach((partition, consumer) ->
stats.updateCumulativeStats(partition, consumer.getStats()));
return stats;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 56f40433faf..f780edc95c1 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -62,7 +62,7 @@ public class PartitionedProducerImpl<T> extends
ProducerBase<T> {
private final ConcurrentOpenHashMap<Integer, ProducerImpl<T>> producers;
private final MessageRouter routerPolicy;
- private final ProducerStatsRecorderImpl stats;
+ private final PartitionedTopicProducerStatsRecorderImpl stats;
private TopicMetadata topicMetadata;
private final int firstPartitionIndex;
private String overrideProducerName;
@@ -80,7 +80,9 @@ public class PartitionedProducerImpl<T> extends
ProducerBase<T> {
ConcurrentOpenHashMap.<Integer,
ProducerImpl<T>>newBuilder().build();
this.topicMetadata = new TopicMetadataImpl(numPartitions);
this.routerPolicy = getMessageRouter();
- stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new
ProducerStatsRecorderImpl() : null;
+ stats = client.getConfiguration().getStatsIntervalSeconds() > 0
+ ? new PartitionedTopicProducerStatsRecorderImpl()
+ : null;
// MaxPendingMessagesAcrossPartitions doesn't support partial
partition such as SinglePartition correctly
int maxPendingMessages = Math.min(conf.getMaxPendingMessages(),
@@ -353,7 +355,8 @@ public class PartitionedProducerImpl<T> extends
ProducerBase<T> {
return null;
}
stats.reset();
- producers.values().forEach(p ->
stats.updateCumulativeStats(p.getStats()));
+ producers.forEach(
+ (partition, producer) ->
stats.updateCumulativeStats(producer.getTopic(), producer.getStats()));
return stats;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedTopicProducerStatsRecorderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedTopicProducerStatsRecorderImpl.java
new file mode 100644
index 00000000000..2f73a6af406
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedTopicProducerStatsRecorderImpl.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.DoubleAdder;
+import org.apache.pulsar.client.api.PartitionedTopicProducerStats;
+import org.apache.pulsar.client.api.ProducerStats;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PartitionedTopicProducerStatsRecorderImpl extends
ProducerStatsRecorderImpl
+ implements PartitionedTopicProducerStats {
+
+ private static final long serialVersionUID = 1L;
+ private Map<String, ProducerStats> partitionStats = Collections.emptyMap();
+ private final DoubleAdder sendMsgsRateAggregate;
+ private final DoubleAdder sendBytesRateAggregate;
+ private int partitions = 0;
+
+ public PartitionedTopicProducerStatsRecorderImpl() {
+ super();
+ partitionStats = new ConcurrentHashMap<>();
+ sendMsgsRateAggregate = new DoubleAdder();
+ sendBytesRateAggregate = new DoubleAdder();
+ }
+
+ void reset() {
+ super.reset();
+ partitions = 0;
+ }
+
+ void updateCumulativeStats(String partition, ProducerStats stats) {
+ super.updateCumulativeStats(stats);
+ if (stats == null) {
+ return;
+ }
+ partitionStats.put(partition, stats);
+ // update rates
+ sendMsgsRateAggregate.add(stats.getSendMsgsRate());
+ sendBytesRateAggregate.add(stats.getSendBytesRate());
+ partitions++;
+ }
+
+ @Override
+ public double getSendMsgsRate() {
+ return sendMsgsRateAggregate.doubleValue() / partitions;
+ }
+
+ @Override
+ public double getSendBytesRate() {
+ return sendBytesRateAggregate.doubleValue() / partitions;
+ }
+
+ @Override
+ public Map<String, ProducerStats> getPartitionStats() {
+ return partitionStats;
+ }
+
+ private static final Logger log =
LoggerFactory.getLogger(PartitionedTopicProducerStatsRecorderImpl.class);
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsDisabled.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsDisabled.java
index f2a63d91c4a..365cdf7ede4 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsDisabled.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsDisabled.java
@@ -132,4 +132,5 @@ public class ProducerStatsDisabled implements
ProducerStatsRecorder {
public int getPendingQueueSize() {
return 0;
}
+
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
index a7e541e5de1..01ed84f5503 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
@@ -27,7 +27,6 @@ import io.netty.util.TimerTask;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.DoubleAdder;
import java.util.concurrent.atomic.LongAdder;
import org.apache.pulsar.client.api.ProducerStats;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
@@ -51,8 +50,6 @@ public class ProducerStatsRecorderImpl implements
ProducerStatsRecorder {
private final LongAdder totalBytesSent;
private final LongAdder totalSendFailed;
private final LongAdder totalAcksReceived;
- private final DoubleAdder sendMsgsRateAggregate;
- private final DoubleAdder sendBytesRateAggregate;
private static final DecimalFormat DEC = new DecimalFormat("0.000");
private static final DecimalFormat THROUGHPUT_FORMAT = new
DecimalFormat("0.00");
private final transient DoublesSketch ds;
@@ -61,7 +58,6 @@ public class ProducerStatsRecorderImpl implements
ProducerStatsRecorder {
private volatile double sendMsgsRate;
private volatile double sendBytesRate;
- private int partitions = 0;
private volatile double[] latencyPctValues = new
double[PERCENTILES.length];
private volatile double[] batchSizePctValues = new
double[PERCENTILES.length];
private volatile double[] msgSizePctValues = new
double[PERCENTILES.length];
@@ -77,8 +73,6 @@ public class ProducerStatsRecorderImpl implements
ProducerStatsRecorder {
totalBytesSent = new LongAdder();
totalSendFailed = new LongAdder();
totalAcksReceived = new LongAdder();
- sendMsgsRateAggregate = new DoubleAdder();
- sendBytesRateAggregate = new DoubleAdder();
ds = DoublesSketch.builder().build(256);
batchSizeDs = DoublesSketch.builder().build(256);
msgSizeDs = DoublesSketch.builder().build(256);
@@ -97,8 +91,6 @@ public class ProducerStatsRecorderImpl implements
ProducerStatsRecorder {
totalBytesSent = new LongAdder();
totalSendFailed = new LongAdder();
totalAcksReceived = new LongAdder();
- sendMsgsRateAggregate = new DoubleAdder();
- sendBytesRateAggregate = new DoubleAdder();
ds = DoublesSketch.builder().build(256);
batchSizeDs = DoublesSketch.builder().build(256);
msgSizeDs = DoublesSketch.builder().build(256);
@@ -247,7 +239,6 @@ public class ProducerStatsRecorderImpl implements
ProducerStatsRecorder {
totalBytesSent.reset();
totalSendFailed.reset();
totalAcksReceived.reset();
- partitions = 0;
}
void updateCumulativeStats(ProducerStats stats) {
@@ -262,10 +253,6 @@ public class ProducerStatsRecorderImpl implements
ProducerStatsRecorder {
totalBytesSent.add(stats.getTotalBytesSent());
totalSendFailed.add(stats.getTotalSendFailed());
totalAcksReceived.add(stats.getTotalAcksReceived());
- // update rates
- sendMsgsRateAggregate.add(stats.getSendMsgsRate());
- sendBytesRateAggregate.add(stats.getSendBytesRate());
- partitions++;
}
@Override
@@ -306,12 +293,12 @@ public class ProducerStatsRecorderImpl implements
ProducerStatsRecorder {
@Override
public double getSendMsgsRate() {
- return partitions != 0 ? sendMsgsRateAggregate.doubleValue() /
partitions : sendMsgsRate;
+ return sendMsgsRate;
}
@Override
public double getSendBytesRate() {
- return partitions != 0 ? sendBytesRateAggregate.doubleValue() /
partitions : sendBytesRate;
+ return sendBytesRate;
}
@Override
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
index 28f47105f86..0b4379e087f 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
@@ -80,10 +80,10 @@ public class ProducerStatsRecorderImplTest {
@Test
public void testPartitionTopicAggegationStats() {
ProducerStatsRecorderImpl recorder1 = spy(new
ProducerStatsRecorderImpl());
- ProducerStatsRecorderImpl recorder2 = new ProducerStatsRecorderImpl();
+ PartitionedTopicProducerStatsRecorderImpl recorder2 = new
PartitionedTopicProducerStatsRecorderImpl();
when(recorder1.getSendMsgsRate()).thenReturn(1000.0);
when(recorder1.getSendBytesRate()).thenReturn(1000.0);
- recorder2.updateCumulativeStats(recorder1);
+ recorder2.updateCumulativeStats("test", recorder1);
assertTrue(recorder2.getSendBytesRate() > 0);
assertTrue(recorder2.getSendMsgsRate() > 0);
}