This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new b3e34f6a839 [fix] Avoid redelivering duplicated messages when batching 
is enabled (#18486)
b3e34f6a839 is described below

commit b3e34f6a839fe01ffc531eb4ee6b9b8687aee61b
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Nov 21 17:43:01 2022 +0800

    [fix] Avoid redelivering duplicated messages when batching is enabled 
(#18486)
    
    https://github.com/apache/pulsar/pull/18454 fixed the potential message 
loss when a batched message is redelivered and one single message of the batch 
is added to the ACK tracker. However, it also leads to a potential message 
duplication, see the `testConsumerDedup` test modified by #18454.
    
    The root cause is that single messages will still be passed into the 
`isDuplicated` method in `receiveIndividualMessagesFromBatch`. However, in this 
case, the `MessageId` is a `BatchedMessageIdImpl`, while the `MessageId` in 
`lastCumulativeAck` or `pendingIndividualAcks` are `MessageIdImpl` 
implementations.
    
    Validate the class type in `isDuplicated` and convert a 
`BatchedMessageIdImpl` to `MessageIdImpl`. Then revert the unnecessary changes 
in #18454.
    
    `ConsumerRedeliveryTest#testAckNotSent` is added to verify it works.
    
    The duplication could still happen when batch index ACK is enabled. Because 
even after the ACK tracker is flushed, if only parts of a batched message are 
not acknowledged, the whole batched message would still be redelivered. I will 
open another PR to fix it.
    
    <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
    
    - [ ] `doc` <!-- Your PR contains doc changes. Please attach the local 
preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR 
description, or else your PR might not get merged. -->
    - [ ] `doc-required` <!-- Your PR changes impact docs and you will update 
later -->
    - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
    - [ ] `doc-complete` <!-- Docs have been already added -->
    
    PR in forked repository: https://github.com/BewareMyPower/pulsar/pull/8
    
    (cherry picked from commit be1d07e16119fafbe489315ab68c4751bf03c31d)
---
 .../pulsar/client/api/ConsumerRedeliveryTest.java  | 73 ++++++++++++++++++++--
 .../impl/ConsumerDedupPermitsUpdateTest.java       | 21 ++-----
 .../PersistentAcknowledgmentsGroupingTracker.java  | 32 ++++++++--
 .../pulsar/client/impl/LastCumulativeAckTest.java  | 20 ++++++
 4 files changed, 121 insertions(+), 25 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
index 95e343acfef..6521b24f188 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
@@ -25,11 +25,12 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import lombok.Cleanup;
-
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.api.proto.CommandAck;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,9 +38,7 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
-
 import com.google.common.collect.Sets;
-
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
@@ -69,6 +68,19 @@ public class ConsumerRedeliveryTest extends 
ProducerConsumerBase {
         return new Object[][] { { true }, { false } };
     }
 
+    @DataProvider(name = "batchedMessageAck")
+    public Object[][] batchedMessageAck() {
+        // When batch index ack is disabled (by default), only after all 
single messages were sent would the pending
+        // ACK be added into the ACK tracker.
+        return new Object[][] {
+                // numAcked, batchSize, ack type
+                { 3, 5, CommandAck.AckType.Individual },
+                { 5, 5, CommandAck.AckType.Individual },
+                { 3, 5, CommandAck.AckType.Cumulative },
+                { 5, 5, CommandAck.AckType.Cumulative }
+        };
+    }
+
     /**
      * It verifies that redelivered messages are sorted based on the 
ledger-ids.
      * <pre>
@@ -301,4 +313,57 @@ public class ConsumerRedeliveryTest extends 
ProducerConsumerBase {
         consumer.close();
         producer.close();
     }
+
+    @Test(timeOut = 30000, dataProvider = "batchedMessageAck")
+    public void testAckNotSent(int numAcked, int batchSize, CommandAck.AckType 
ackType) throws Exception {
+        String topic = "persistent://my-property/my-ns/test-ack-not-sent-"
+                + numAcked + "-" + batchSize + "-" + ackType.getValue();
+        @Cleanup Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("sub")
+                .enableBatchIndexAcknowledgment(false)
+                .acknowledgmentGroupTime(1, TimeUnit.HOURS) // ACK won't be 
sent
+                .subscribe();
+        @Cleanup Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .enableBatching(true)
+                .batchingMaxMessages(batchSize)
+                .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+                .create();
+        for (int i = 0; i < batchSize; i++) {
+            String value = "msg-" + i;
+            producer.sendAsync(value).thenAccept(id -> log.info("{} was sent 
to {}", value, id));
+        }
+        List<Message<String>> messages = new ArrayList<>();
+        for (int i = 0; i < batchSize; i++) {
+            messages.add(consumer.receive());
+        }
+        if (ackType == CommandAck.AckType.Individual) {
+            for (int i = 0; i < numAcked; i++) {
+                consumer.acknowledge(messages.get(i));
+            }
+        } else {
+            consumer.acknowledgeCumulative(messages.get(numAcked - 1));
+        }
+
+        consumer.redeliverUnacknowledgedMessages();
+
+        messages.clear();
+        for (int i = 0; i < batchSize; i++) {
+            Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
+            if (msg == null) {
+                break;
+            }
+            log.info("Received {} from {}", msg.getValue(), 
msg.getMessageId());
+            messages.add(msg);
+        }
+        List<String> values = 
messages.stream().map(Message::getValue).collect(Collectors.toList());
+        // All messages are redelivered because only if the whole batch are 
acknowledged would the message ID be
+        // added into the ACK tracker.
+        if (numAcked < batchSize) {
+            assertEquals(values, IntStream.range(0, batchSize).mapToObj(i -> 
"msg-" + i).collect(Collectors.toList()));
+        } else {
+            assertTrue(values.isEmpty());
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java
index ceb7d7fd484..4c9922acbec 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java
@@ -116,23 +116,10 @@ public class ConsumerDedupPermitsUpdateTest extends 
ProducerConsumerBase {
         }
         producer.flush();
 
-        if (batchingEnabled) {
-            for (int i = 0; i < 30; i++) {
-                Message<String> msg = consumer.receive();
-                assertEquals(msg.getValue(), "hello-" + i);
-                consumer.acknowledge(msg);
-            }
-            for (int i = 0; i < 30; i++) {
-                Message<String> msg = consumer.receive();
-                assertEquals(msg.getValue(), "new-message-" + i);
-                consumer.acknowledge(msg);
-            }
-        } else {
-            for (int i = 0; i < 30; i++) {
-                Message<String> msg = consumer.receive();
-                assertEquals(msg.getValue(), "new-message-" + i);
-                consumer.acknowledge(msg);
-            }
+        for (int i = 0; i < 30; i++) {
+            Message<String> msg = consumer.receive();
+            assertEquals(msg.getValue(), "new-message-" + i);
+            consumer.acknowledge(msg);
         }
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index 283fd93a84a..bf8fe3c1a26 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -110,12 +110,18 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
      */
     @Override
     public boolean isDuplicate(MessageId messageId) {
-        final MessageIdImpl messageIdOfLastAck = 
lastCumulativeAck.getMessageId();
-        if (messageIdOfLastAck != null && 
messageId.compareTo(messageIdOfLastAck) <= 0) {
+        if (!(messageId instanceof MessageIdImpl)) {
+            throw new IllegalArgumentException("isDuplicated cannot accept "
+                    + messageId.getClass().getName() + ": " + messageId);
+        }
+        if (lastCumulativeAck.compareTo(messageId) >= 0) {
             // Already included in a cumulative ack
             return true;
         } else {
-            return pendingIndividualAcks.contains((MessageIdImpl) messageId);
+            final MessageIdImpl messageIdImpl = (messageId instanceof 
BatchMessageIdImpl)
+                    ? ((BatchMessageIdImpl) messageId).toMessageIdImpl()
+                    : (MessageIdImpl) messageId;
+            return pendingIndividualAcks.contains(messageIdImpl);
         }
     }
 
@@ -650,7 +656,7 @@ class LastCumulativeAck {
     private boolean flushRequired = false;
 
     public synchronized void update(final MessageIdImpl messageId, final 
BitSetRecyclable bitSetRecyclable) {
-        if (messageId.compareTo(this.messageId) > 0) {
+        if (compareTo(messageId) < 0) {
             if (this.bitSetRecyclable != null && this.bitSetRecyclable != 
bitSetRecyclable) {
                 this.bitSetRecyclable.recycle();
             }
@@ -684,6 +690,24 @@ class LastCumulativeAck {
         flushRequired = false;
     }
 
+    public synchronized int compareTo(MessageId messageId) {
+        if (this.messageId instanceof BatchMessageIdImpl && (!(messageId 
instanceof BatchMessageIdImpl))) {
+            final BatchMessageIdImpl lhs = (BatchMessageIdImpl) this.messageId;
+            final MessageIdImpl rhs = (MessageIdImpl) messageId;
+            return MessageIdImpl.messageIdCompare(
+                    lhs.getLedgerId(), lhs.getEntryId(), 
lhs.getPartitionIndex(), lhs.getBatchIndex(),
+                    rhs.getLedgerId(), rhs.getEntryId(), 
rhs.getPartitionIndex(), Integer.MAX_VALUE);
+        } else if (messageId instanceof BatchMessageIdImpl && 
(!(this.messageId instanceof BatchMessageIdImpl))){
+            final MessageIdImpl lhs = this.messageId;
+            final BatchMessageIdImpl rhs = (BatchMessageIdImpl) messageId;
+            return MessageIdImpl.messageIdCompare(
+                    lhs.getLedgerId(), lhs.getEntryId(), 
lhs.getPartitionIndex(), Integer.MAX_VALUE,
+                    rhs.getLedgerId(), rhs.getEntryId(), 
rhs.getPartitionIndex(), rhs.getBatchIndex());
+        } else {
+            return this.messageId.compareTo(messageId);
+        }
+    }
+
     private synchronized void set(final MessageIdImpl messageId, final 
BitSetRecyclable bitSetRecyclable) {
         this.messageId = messageId;
         this.bitSetRecyclable = bitSetRecyclable;
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java
index 102ccfc0e07..5a46b855342 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java
@@ -83,4 +83,24 @@ public class LastCumulativeAckTest {
         assertEquals(lastCumulativeAckToFlush.getBitSetRecyclable(), 
bitSetRecyclable);
     }
 
+    @Test
+    public void testCompareTo() {
+        LastCumulativeAck lastCumulativeAck = new LastCumulativeAck();
+        lastCumulativeAck.update(new MessageIdImpl(0L, 1L, -1), null);
+
+        assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 0L, -1)) 
> 0);
+        assertEquals(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 1L, 
-1)), 0);
+        assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 2L, -1)) 
< 0);
+        assertTrue(lastCumulativeAck.compareTo(new BatchMessageIdImpl(0L, 1L, 
-1, 0)) > 0);
+
+        lastCumulativeAck = new LastCumulativeAck();
+        lastCumulativeAck.update(new BatchMessageIdImpl(0L, 1L, -1, 1), null);
+
+        assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 0L, -1)) 
> 0);
+        assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 1L, -1)) 
< 0);
+        assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 2L, -1)) 
< 0);
+        assertTrue(lastCumulativeAck.compareTo(new BatchMessageIdImpl(0L, 1L, 
-1, 0)) > 0);
+        assertTrue(lastCumulativeAck.compareTo(new BatchMessageIdImpl(0L, 1L, 
-1, 2)) < 0);
+        assertEquals(lastCumulativeAck.compareTo(new BatchMessageIdImpl(0L, 
1L, -1, 1)), 0);
+    }
 }

Reply via email to