codelipenghui commented on code in PR #17032:
URL: https://github.com/apache/pulsar/pull/17032#discussion_r945023402


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java:
##########
@@ -220,20 +223,36 @@ public void sendReachedEndOfTopic(long consumerId) {
     }
 
     @Override
-    public ChannelPromise sendMessagesToConsumer(long consumerId, String 
topicName, Subscription subscription,
+    public ChannelPromise sendMessagesToConsumer(Consumer consumer, String 
topicName, Subscription subscription,
                                                  int partitionIdx, List<? 
extends Entry> entries,
                                                  EntryBatchSizes batchSizes, 
EntryBatchIndexesAcks batchIndexesAcks,
                                                  RedeliveryTracker 
redeliveryTracker, long epoch) {
         final ChannelHandlerContext ctx = cnx.ctx();
         final ChannelPromise writePromise = ctx.newPromise();
+        final long consumerId = consumer.consumerId();
         ctx.channel().eventLoop().execute(() -> {
+            ManagedCursorImpl cursor = null;
+            if (subscription instanceof PersistentSubscription 
persistentSubscription
+                    && !(subscription instanceof CompactorSubscription)
+                    && !consumer.readCompacted()) {
+                cursor = (ManagedCursorImpl) 
persistentSubscription.getCursor();
+            }
+
             for (int i = 0; i < entries.size(); i++) {
                 Entry entry = entries.get(i);
                 if (entry == null) {
                     // Entry was filtered out
                     continue;
                 }
 
+                // Filter out already delete entry, because message ack may be 
already has changed
+                if (cursor != null && cursor.isDurable()) {
+                  if (cursor.isMessageDeleted(entry.getPosition())) {
+                      entry.release();
+                      continue;
+                  }

Review Comment:
   The available permits had decreased before but here skipped the data, it 
might introduce a case that the consumer can't receive any messages because the 
available permits of this consumer at the server-side is 0.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to