jai1 commented on a change in pull request #3991: [pulsar-storm] Fix NPE while 
emitting next tuple
URL: https://github.com/apache/pulsar/pull/3991#discussion_r272763031
 
 

 ##########
 File path: pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
 ##########
 @@ -215,6 +204,36 @@ public void emitNextAvailableTuple() {
         }
     }
 
+    private boolean emitFailedMessage() {
+        Message<byte[]> msg;
+
+        while ((msg = failedMessages.peek()) != null) {
+            MessageRetries messageRetries = 
pendingMessageRetries.get(msg.getMessageId());
+            if (messageRetries != null) {
+                // emit the tuple if retry doesn't need backoff else sleep 
with backoff time and return without doing
+                // anything
+                if (Backoff.shouldBackoff(messageRetries.getTimeStamp(), 
TimeUnit.NANOSECONDS,
+                        messageRetries.getNumRetries(), 
clientConf.getDefaultBackoffIntervalNanos(),
+                        clientConf.getMaxBackoffIntervalNanos())) {
+                    
Utils.sleep(TimeUnit.NANOSECONDS.toMillis(clientConf.getDefaultBackoffIntervalNanos()));
+                } else {
+                    // remove the message from the queue and emit to the 
topology, only if it should not be backedoff
+                    LOG.info("[{}] Retrying failed message {}", spoutId, 
msg.getMessageId());
+                    failedMessages.remove();
+                    mapToValueAndEmit(msg);
+                }
+                return true;
+            }
+
+            // messageRetries is null because messageRetries is already acked 
and removed from pendingMessageRetries
+            // then remove it from failed message queue as well.
+            failedMessages.remove();
 
 Review comment:
   Add a debug log here, in case people complain that their messages are not 
delivered or redelivered.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to