jai1 commented on a change in pull request #3991: [pulsar-storm] Fix NPE while
emitting next tuple
URL: https://github.com/apache/pulsar/pull/3991#discussion_r272763031
##########
File path: pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
##########
@@ -215,6 +204,36 @@ public void emitNextAvailableTuple() {
}
}
+ private boolean emitFailedMessage() {
+ Message<byte[]> msg;
+
+ while ((msg = failedMessages.peek()) != null) {
+ MessageRetries messageRetries =
pendingMessageRetries.get(msg.getMessageId());
+ if (messageRetries != null) {
+ // emit the tuple if retry doesn't need backoff else sleep
with backoff time and return without doing
+ // anything
+ if (Backoff.shouldBackoff(messageRetries.getTimeStamp(),
TimeUnit.NANOSECONDS,
+ messageRetries.getNumRetries(),
clientConf.getDefaultBackoffIntervalNanos(),
+ clientConf.getMaxBackoffIntervalNanos())) {
+
Utils.sleep(TimeUnit.NANOSECONDS.toMillis(clientConf.getDefaultBackoffIntervalNanos()));
+ } else {
+ // remove the message from the queue and emit to the
topology, only if it should not be backedoff
+ LOG.info("[{}] Retrying failed message {}", spoutId,
msg.getMessageId());
+ failedMessages.remove();
+ mapToValueAndEmit(msg);
+ }
+ return true;
+ }
+
+ // messageRetries is null because messageRetries is already acked
and removed from pendingMessageRetries
+ // then remove it from failed message queue as well.
+ failedMessages.remove();
Review comment:
Add a debug log here, in case people complain that their messages are not
delivered or redelivered.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services