codelipenghui commented on a change in pull request #7299:
URL: https://github.com/apache/pulsar/pull/7299#discussion_r451542303



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
##########
@@ -70,6 +74,56 @@
     public ReplicatedSubscriptionsController(PersistentTopic topic, String 
localCluster) {
         this.topic = topic;
         this.localCluster = localCluster;
+
+        this.publishContext = new Topic.PublishContext() {
+            @Override
+            public void completed(Exception e, long ledgerId, long entryId) {
+                // Nothing to do in case of publish errors since the retry 
logic is applied upstream
+                // after a snapshot is not closed
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Published marker at {}:{}. Exception: {}", 
topic.getName(), ledgerId, entryId, e);
+                }
+
+                if (e == null) {
+                    // Acknowledge the marker message to prevent it from 
accumulating in the backlog
+                    Position position = new PositionImpl(ledgerId, entryId);
+                    topic.getSubscriptions().forEach((subName, sub) -> {
+                        if (sub != null) {
+                            
sub.acknowledgeMessage(Collections.singletonList(position), AckType.Individual,
+                                    Collections.emptyMap());
+                        }
+                    });
+                }
+            }
+        };
+
+        this.publishContextForSnapshot = new Topic.PublishContext() {
+            @Override
+            public void completed(Exception e, long ledgerId, long entryId) {
+                // Nothing to do in case of publish errors since the retry 
logic is applied upstream
+                // after a snapshot is not closed
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Published snapshot at {}:{}. Exception: 
{}", topic.getName(), ledgerId, entryId, e);
+                }
+
+                if (e == null) {
+                    // If no consumers are connected, or if they are connected 
but cannot receive any messages
+                    // (i.e. the dispatcher is not reading new entries), 
acknowledge the snapshot message
+                    // to prevent it from accumulating in the backlog
+                    Position position = new PositionImpl(ledgerId, entryId);
+                    topic.getSubscriptions().forEach((subName, sub) -> {
+                        if (sub != null) {
+                            Dispatcher dispatcher = sub.getDispatcher();
+                            if (dispatcher == null || 
!dispatcher.isAtleastOneConsumerAvailable()) {
+                                
sub.acknowledgeMessage(Collections.singletonList(position), AckType.Individual,
+                                        Collections.emptyMap());
+                            }

Review comment:
       > Is there any other solution to this issue?
   
   I also don’t have a very good way, it looks okay, just an extreme case
   
   > If there are no subscriptions on the topic, we don't need to ack since the 
messages will not accumulate in the backlog, do we?
   
   make sense




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to