eolivelli commented on code in PR #15391:
URL: https://github.com/apache/pulsar/pull/15391#discussion_r867357016


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -227,6 +236,15 @@ && trackDelayedDelivery(entry.getLedgerId(), 
entry.getEntryId(), msgMetadata)) {
                 ((AbstractTopic) topic).addFilteredEntriesCount(filtered);
             }
         }
+        if (CollectionUtils.isNotEmpty(entriesToRedeliver)) {
+            
this.subscription.getTopic().getBrokerService().getPulsar().getExecutor()
+                    .schedule(() -> {
+                        // simulate the Consumer rejected the message
+                        subscription
+                                .redeliverUnacknowledgedMessages(consumer, 
entriesToRedeliver);
+                    }, 1, TimeUnit.SECONDS);
+
+        }

Review Comment:
   with "Single active consume" the outcome it the same, in that case we are 
deferring the processing of the message (like when the consumer negatively acks 
the message on the client because the message cannot currently be processed due 
to some reason).
   
   Unfortunately the EntryFilter cannot return a value for the delay period 
without breaking the API (we only have a enum)
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to