This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8486906 Update Producer stats on producer close() (#12500)
8486906 is described below
commit 848690621299353284932e9281e4689813835855
Author: Kai Wang <[email protected]>
AuthorDate: Wed Oct 27 00:50:47 2021 +0800
Update Producer stats on producer close() (#12500)
---
.../client/impl/ProducerStatsRecorderImpl.java | 90 +++++++++++-----------
.../client/impl/PartitionedProducerImplTest.java | 33 ++++++++
.../client/impl/ProducerStatsRecorderImplTest.java | 20 +++++
3 files changed, 100 insertions(+), 43 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
index faf73cb..6b435d6 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
@@ -111,49 +111,7 @@ public class ProducerStatsRecorderImpl implements
ProducerStatsRecorder {
}
try {
- long now = System.nanoTime();
- double elapsed = (now - oldTime) / 1e9;
- oldTime = now;
-
- long currentNumMsgsSent = numMsgsSent.sumThenReset();
- long currentNumBytesSent = numBytesSent.sumThenReset();
- long currentNumSendFailedMsgs = numSendFailed.sumThenReset();
- long currentNumAcksReceived = numAcksReceived.sumThenReset();
-
- totalMsgsSent.add(currentNumMsgsSent);
- totalBytesSent.add(currentNumBytesSent);
- totalSendFailed.add(currentNumSendFailedMsgs);
- totalAcksReceived.add(currentNumAcksReceived);
-
- synchronized (ds) {
- latencyPctValues = ds.getQuantiles(PERCENTILES);
- ds.reset();
- }
-
- sendMsgsRate = currentNumMsgsSent / elapsed;
- sendBytesRate = currentNumBytesSent / elapsed;
-
- if ((currentNumMsgsSent | currentNumSendFailedMsgs |
currentNumAcksReceived
- | currentNumMsgsSent) != 0) {
-
- for (int i = 0; i < latencyPctValues.length; i++) {
- if (Double.isNaN(latencyPctValues[i])) {
- latencyPctValues[i] = 0;
- }
- }
-
- log.info("[{}] [{}] Pending messages: {} --- Publish
throughput: {} msg/s --- {} Mbit/s --- "
- + "Latency: med: {} ms - 95pct: {} ms - 99pct: {}
ms - 99.9pct: {} ms - max: {} ms --- "
- + "Ack received rate: {} ack/s --- Failed
messages: {}", producer.getTopic(),
- producer.getProducerName(),
producer.getPendingQueueSize(),
- THROUGHPUT_FORMAT.format(sendMsgsRate),
- THROUGHPUT_FORMAT.format(sendBytesRate / 1024 /
1024 * 8),
- DEC.format(latencyPctValues[0]),
DEC.format(latencyPctValues[2]),
- DEC.format(latencyPctValues[3]),
DEC.format(latencyPctValues[4]),
- DEC.format(latencyPctValues[5]),
- THROUGHPUT_FORMAT.format(currentNumAcksReceived /
elapsed), currentNumSendFailedMsgs);
- }
-
+ updateStats();
} catch (Exception e) {
log.error("[{}] [{}]: {}", producer.getTopic(),
producer.getProducerName(), e.getMessage());
} finally {
@@ -171,6 +129,51 @@ public class ProducerStatsRecorderImpl implements
ProducerStatsRecorder {
return statTimeout;
}
+ protected void updateStats() {
+ long now = System.nanoTime();
+ double elapsed = (now - oldTime) / 1e9;
+ oldTime = now;
+
+ long currentNumMsgsSent = numMsgsSent.sumThenReset();
+ long currentNumBytesSent = numBytesSent.sumThenReset();
+ long currentNumSendFailedMsgs = numSendFailed.sumThenReset();
+ long currentNumAcksReceived = numAcksReceived.sumThenReset();
+
+ totalMsgsSent.add(currentNumMsgsSent);
+ totalBytesSent.add(currentNumBytesSent);
+ totalSendFailed.add(currentNumSendFailedMsgs);
+ totalAcksReceived.add(currentNumAcksReceived);
+
+ synchronized (ds) {
+ latencyPctValues = ds.getQuantiles(PERCENTILES);
+ ds.reset();
+ }
+
+ sendMsgsRate = currentNumMsgsSent / elapsed;
+ sendBytesRate = currentNumBytesSent / elapsed;
+
+ if ((currentNumMsgsSent | currentNumSendFailedMsgs |
currentNumAcksReceived
+ | currentNumMsgsSent) != 0) {
+
+ for (int i = 0; i < latencyPctValues.length; i++) {
+ if (Double.isNaN(latencyPctValues[i])) {
+ latencyPctValues[i] = 0;
+ }
+ }
+
+ log.info("[{}] [{}] Pending messages: {} --- Publish throughput:
{} msg/s --- {} Mbit/s --- "
+ + "Latency: med: {} ms - 95pct: {} ms - 99pct: {}
ms - 99.9pct: {} ms - max: {} ms --- "
+ + "Ack received rate: {} ack/s --- Failed
messages: {}", producer.getTopic(),
+ producer.getProducerName(), producer.getPendingQueueSize(),
+ THROUGHPUT_FORMAT.format(sendMsgsRate),
+ THROUGHPUT_FORMAT.format(sendBytesRate / 1024 / 1024 * 8),
+ DEC.format(latencyPctValues[0]),
DEC.format(latencyPctValues[2]),
+ DEC.format(latencyPctValues[3]),
DEC.format(latencyPctValues[4]),
+ DEC.format(latencyPctValues[5]),
+ THROUGHPUT_FORMAT.format(currentNumAcksReceived /
elapsed), currentNumSendFailedMsgs);
+ }
+ }
+
@Override
public void updateNumMsgsSent(long numMsgs, long totalMsgsSize) {
numMsgsSent.add(numMsgs);
@@ -297,6 +300,7 @@ public class ProducerStatsRecorderImpl implements
ProducerStatsRecorder {
}
public void cancelStatsTimeout() {
+ this.updateStats();
if (statTimeout != null) {
statTimeout.cancel();
statTimeout = null;
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
index 1f9496b..ad2c992 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
@@ -202,4 +202,37 @@ public class PartitionedProducerImplTest {
impl.getStats();
}
+ @Test
+ public void testGetStatsWithoutArriveUpdateInterval() throws Exception {
+ String topicName = "test-stats-without-arrive-interval";
+ ClientConfigurationData conf = new ClientConfigurationData();
+ conf.setServiceUrl("pulsar://localhost:6650");
+ conf.setStatsIntervalSeconds(100);
+
+ ThreadFactory threadFactory =
+ new DefaultThreadFactory("client-test-stats",
Thread.currentThread().isDaemon());
+ EventLoopGroup eventLoopGroup = EventLoopUtil
+ .newEventLoopGroup(conf.getNumIoThreads(), false,
threadFactory);
+
+ PulsarClientImpl clientImpl = new PulsarClientImpl(conf,
eventLoopGroup);
+
+ ProducerConfigurationData producerConfData = new
ProducerConfigurationData();
+
producerConfData.setMessageRoutingMode(MessageRoutingMode.CustomPartition);
+ producerConfData.setCustomMessageRouter(new CustomMessageRouter());
+
+ assertEquals(Long.parseLong("100"),
clientImpl.getConfiguration().getStatsIntervalSeconds());
+
+ PartitionedProducerImpl<byte[]> impl = new PartitionedProducerImpl<>(
+ clientImpl, topicName, producerConfData,
+ 1, null, null, null);
+
+ impl.getProducers().get(0).getStats().incrementSendFailed();
+ ProducerStatsRecorderImpl stats = impl.getStats();
+ assertEquals(stats.getTotalSendFailed(), 0);
+ // When close producer, the ProducerStatsRecorder will update stats
immediately
+ impl.close();
+ stats = impl.getStats();
+ assertEquals(stats.getTotalSendFailed(), 1);
+ }
+
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
index d654158..f6e7f28 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
@@ -54,4 +54,24 @@ public class ProducerStatsRecorderImplTest {
Thread.sleep(1200);
assertEquals(1000.0, recorder.getSendLatencyMillisMax(), 0.5);
}
+
+ @Test
+ public void testGetStatsAndCancelStatsTimeoutWithoutArriveUpdateInterval()
{
+ ClientConfigurationData conf = new ClientConfigurationData();
+ conf.setStatsIntervalSeconds(60);
+ PulsarClientImpl client = mock(PulsarClientImpl.class);
+ when(client.getConfiguration()).thenReturn(conf);
+ Timer timer = new HashedWheelTimer();
+ when(client.timer()).thenReturn(timer);
+ ProducerImpl<?> producer = mock(ProducerImpl.class);
+ when(producer.getTopic()).thenReturn("topic-test");
+ when(producer.getProducerName()).thenReturn("producer-test");
+ when(producer.getPendingQueueSize()).thenReturn(1);
+ ProducerConfigurationData producerConfigurationData = new
ProducerConfigurationData();
+ ProducerStatsRecorderImpl recorder = new
ProducerStatsRecorderImpl(client, producerConfigurationData, producer);
+ long latencyNs = TimeUnit.SECONDS.toNanos(1);
+ recorder.incrementNumAcksReceived(latencyNs);
+ recorder.cancelStatsTimeout();
+ assertEquals(1000.0, recorder.getSendLatencyMillisMax(), 0.5);
+ }
}