This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2c09ff2 [pulsar-storm] add more metrics to troubleshoot spout
throughput (#4280)
2c09ff2 is described below
commit 2c09ff2f8e519b96a34eda391b450133b68cd5aa
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Wed May 15 03:05:37 2019 -0700
[pulsar-storm] add more metrics to troubleshoot spout throughput (#4280)
### Motivation
Many time user sees lower throughput in pulsar-spout even though standalone
consumer can consume such msgRate easily. It would be hard to debug user's
topology without enough information so, adding two metrics which can impact
spout throughput.
- number of message filed: spout sleeps when it sees failed message so,
it's important to have visibility of that count
- number of times spout-thread not found the message in queue: spout
topology internally sleeps if it doesn't see any emitted tuple in collector
after triggering `nextTuple()` api.
This metrics gives more visibility about consumer throughput.
---
.../src/main/java/org/apache/pulsar/storm/PulsarSpout.java | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
diff --git
a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
index bbfe5cd..5a5ea59 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
@@ -58,6 +58,8 @@ public class PulsarSpout extends BaseRichSpout implements
IMetric {
public static final String NO_OF_PENDING_FAILED_MESSAGES =
"numberOfPendingFailedMessages";
public static final String NO_OF_MESSAGES_RECEIVED =
"numberOfMessagesReceived";
public static final String NO_OF_MESSAGES_EMITTED =
"numberOfMessagesEmitted";
+ public static final String NO_OF_MESSAGES_FAILED =
"numberOfMessagesFailed";
+ public static final String MESSAGE_NOT_AVAILABLE_COUNT =
"messageNotAvailableCount";
public static final String NO_OF_PENDING_ACKS = "numberOfPendingAcks";
public static final String CONSUMER_RATE = "consumerRate";
public static final String CONSUMER_THROUGHPUT_BYTES =
"consumerThroughput";
@@ -78,6 +80,8 @@ public class PulsarSpout extends BaseRichSpout implements
IMetric {
private Consumer<byte[]> consumer;
private volatile long messagesReceived = 0;
private volatile long messagesEmitted = 0;
+ private volatile long messagesFailed = 0;
+ private volatile long messageNotAvailableCount = 0;
private volatile long pendingAcks = 0;
private volatile long messageSizeReceived = 0;
@@ -157,7 +161,7 @@ public class PulsarSpout extends BaseRichSpout implements
IMetric {
pendingMessageRetries.putIfAbsent(id, messageRetries);
failedMessages.add(msg);
--pendingAcks;
-
+ messagesFailed++;
} else {
LOG.warn("[{}] Number of retries limit reached, dropping the
message {}", spoutId, id);
ack(msg);
@@ -203,6 +207,7 @@ public class PulsarSpout extends BaseRichSpout implements
IMetric {
} else {
// queue is empty and nothing to emit
done = true;
+ messageNotAvailableCount++;
}
}
} catch (PulsarClientException e) {
@@ -334,6 +339,8 @@ public class PulsarSpout extends BaseRichSpout implements
IMetric {
metricsMap.put(NO_OF_PENDING_FAILED_MESSAGES, (long)
pendingMessageRetries.size());
metricsMap.put(NO_OF_MESSAGES_RECEIVED, messagesReceived);
metricsMap.put(NO_OF_MESSAGES_EMITTED, messagesEmitted);
+ metricsMap.put(NO_OF_MESSAGES_FAILED, messagesFailed);
+ metricsMap.put(MESSAGE_NOT_AVAILABLE_COUNT, messageNotAvailableCount);
metricsMap.put(NO_OF_PENDING_ACKS, pendingAcks);
metricsMap.put(CONSUMER_RATE, ((double) messagesReceived) /
pulsarSpoutConf.getMetricsTimeIntervalInSecs());
metricsMap.put(CONSUMER_THROUGHPUT_BYTES,
@@ -345,6 +352,8 @@ public class PulsarSpout extends BaseRichSpout implements
IMetric {
messagesReceived = 0;
messagesEmitted = 0;
messageSizeReceived = 0;
+ messagesFailed = 0;
+ messageNotAvailableCount = 0;
}
@SuppressWarnings("rawtypes")