jai1 commented on a change in pull request #3991: [pulsar-storm] Fix NPE while emitting next tuple URL: https://github.com/apache/pulsar/pull/3991#discussion_r272763031
########## File path: pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java ########## @@ -215,6 +204,36 @@ public void emitNextAvailableTuple() { } } + private boolean emitFailedMessage() { + Message<byte[]> msg; + + while ((msg = failedMessages.peek()) != null) { + MessageRetries messageRetries = pendingMessageRetries.get(msg.getMessageId()); + if (messageRetries != null) { + // emit the tuple if retry doesn't need backoff else sleep with backoff time and return without doing + // anything + if (Backoff.shouldBackoff(messageRetries.getTimeStamp(), TimeUnit.NANOSECONDS, + messageRetries.getNumRetries(), clientConf.getDefaultBackoffIntervalNanos(), + clientConf.getMaxBackoffIntervalNanos())) { + Utils.sleep(TimeUnit.NANOSECONDS.toMillis(clientConf.getDefaultBackoffIntervalNanos())); + } else { + // remove the message from the queue and emit to the topology, only if it should not be backedoff + LOG.info("[{}] Retrying failed message {}", spoutId, msg.getMessageId()); + failedMessages.remove(); + mapToValueAndEmit(msg); + } + return true; + } + + // messageRetries is null because messageRetries is already acked and removed from pendingMessageRetries + // then remove it from failed message queue as well. + failedMessages.remove(); Review comment: Add a debug log here, in case people complain that their messages are not delivered or redelivered. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services