This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 47f9041bbd9 [fix] Avoid redelivering duplicated messages when batching
is enabled (#18486)
47f9041bbd9 is described below
commit 47f9041bbd99cbc3ee6b0bcc0e0c6e158325071b
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Nov 21 17:43:01 2022 +0800
[fix] Avoid redelivering duplicated messages when batching is enabled
(#18486)
https://github.com/apache/pulsar/pull/18454 fixed the potential message
loss when a batched message is redelivered and one single message of the batch
is added to the ACK tracker. However, it also leads to a potential message
duplication, see the `testConsumerDedup` test modified by #18454.
The root cause is that single messages will still be passed into the
`isDuplicated` method in `receiveIndividualMessagesFromBatch`. However, in this
case, the `MessageId` is a `BatchedMessageIdImpl`, while the `MessageId` in
`lastCumulativeAck` or `pendingIndividualAcks` are `MessageIdImpl`
implementations.
Validate the class type in `isDuplicated` and convert a
`BatchedMessageIdImpl` to `MessageIdImpl`. Then revert the unnecessary changes
in #18454.
`ConsumerRedeliveryTest#testAckNotSent` is added to verify it works.
The duplication could still happen when batch index ACK is enabled. Because
even after the ACK tracker is flushed, if only parts of a batched message are
not acknowledged, the whole batched message would still be redelivered. I will
open another PR to fix it.
<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
- [ ] `doc` <!-- Your PR contains doc changes. Please attach the local
preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR
description, or else your PR might not get merged. -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update
later -->
- [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->
PR in forked repository: https://github.com/BewareMyPower/pulsar/pull/8
(cherry picked from commit be1d07e16119fafbe489315ab68c4751bf03c31d)
---
.../pulsar/client/api/ConsumerRedeliveryTest.java | 73 ++++++++++++++++++++--
.../impl/ConsumerDedupPermitsUpdateTest.java | 21 ++-----
.../PersistentAcknowledgmentsGroupingTracker.java | 32 ++++++++--
.../pulsar/client/impl/LastCumulativeAckTest.java | 20 ++++++
4 files changed, 121 insertions(+), 25 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
index 95e343acfef..6521b24f188 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
@@ -25,11 +25,12 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import lombok.Cleanup;
-
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,9 +38,7 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-
import com.google.common.collect.Sets;
-
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
@@ -69,6 +68,19 @@ public class ConsumerRedeliveryTest extends
ProducerConsumerBase {
return new Object[][] { { true }, { false } };
}
+ @DataProvider(name = "batchedMessageAck")
+ public Object[][] batchedMessageAck() {
+ // When batch index ack is disabled (by default), only after all
single messages were sent would the pending
+ // ACK be added into the ACK tracker.
+ return new Object[][] {
+ // numAcked, batchSize, ack type
+ { 3, 5, CommandAck.AckType.Individual },
+ { 5, 5, CommandAck.AckType.Individual },
+ { 3, 5, CommandAck.AckType.Cumulative },
+ { 5, 5, CommandAck.AckType.Cumulative }
+ };
+ }
+
/**
* It verifies that redelivered messages are sorted based on the
ledger-ids.
* <pre>
@@ -301,4 +313,57 @@ public class ConsumerRedeliveryTest extends
ProducerConsumerBase {
consumer.close();
producer.close();
}
+
+ @Test(timeOut = 30000, dataProvider = "batchedMessageAck")
+ public void testAckNotSent(int numAcked, int batchSize, CommandAck.AckType
ackType) throws Exception {
+ String topic = "persistent://my-property/my-ns/test-ack-not-sent-"
+ + numAcked + "-" + batchSize + "-" + ackType.getValue();
+ @Cleanup Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("sub")
+ .enableBatchIndexAcknowledgment(false)
+ .acknowledgmentGroupTime(1, TimeUnit.HOURS) // ACK won't be
sent
+ .subscribe();
+ @Cleanup Producer<String> producer =
pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .enableBatching(true)
+ .batchingMaxMessages(batchSize)
+ .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+ .create();
+ for (int i = 0; i < batchSize; i++) {
+ String value = "msg-" + i;
+ producer.sendAsync(value).thenAccept(id -> log.info("{} was sent
to {}", value, id));
+ }
+ List<Message<String>> messages = new ArrayList<>();
+ for (int i = 0; i < batchSize; i++) {
+ messages.add(consumer.receive());
+ }
+ if (ackType == CommandAck.AckType.Individual) {
+ for (int i = 0; i < numAcked; i++) {
+ consumer.acknowledge(messages.get(i));
+ }
+ } else {
+ consumer.acknowledgeCumulative(messages.get(numAcked - 1));
+ }
+
+ consumer.redeliverUnacknowledgedMessages();
+
+ messages.clear();
+ for (int i = 0; i < batchSize; i++) {
+ Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
+ if (msg == null) {
+ break;
+ }
+ log.info("Received {} from {}", msg.getValue(),
msg.getMessageId());
+ messages.add(msg);
+ }
+ List<String> values =
messages.stream().map(Message::getValue).collect(Collectors.toList());
+ // All messages are redelivered because only if the whole batch are
acknowledged would the message ID be
+ // added into the ACK tracker.
+ if (numAcked < batchSize) {
+ assertEquals(values, IntStream.range(0, batchSize).mapToObj(i ->
"msg-" + i).collect(Collectors.toList()));
+ } else {
+ assertTrue(values.isEmpty());
+ }
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java
index ceb7d7fd484..4c9922acbec 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java
@@ -116,23 +116,10 @@ public class ConsumerDedupPermitsUpdateTest extends
ProducerConsumerBase {
}
producer.flush();
- if (batchingEnabled) {
- for (int i = 0; i < 30; i++) {
- Message<String> msg = consumer.receive();
- assertEquals(msg.getValue(), "hello-" + i);
- consumer.acknowledge(msg);
- }
- for (int i = 0; i < 30; i++) {
- Message<String> msg = consumer.receive();
- assertEquals(msg.getValue(), "new-message-" + i);
- consumer.acknowledge(msg);
- }
- } else {
- for (int i = 0; i < 30; i++) {
- Message<String> msg = consumer.receive();
- assertEquals(msg.getValue(), "new-message-" + i);
- consumer.acknowledge(msg);
- }
+ for (int i = 0; i < 30; i++) {
+ Message<String> msg = consumer.receive();
+ assertEquals(msg.getValue(), "new-message-" + i);
+ consumer.acknowledge(msg);
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index 283fd93a84a..bf8fe3c1a26 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -110,12 +110,18 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
*/
@Override
public boolean isDuplicate(MessageId messageId) {
- final MessageIdImpl messageIdOfLastAck =
lastCumulativeAck.getMessageId();
- if (messageIdOfLastAck != null &&
messageId.compareTo(messageIdOfLastAck) <= 0) {
+ if (!(messageId instanceof MessageIdImpl)) {
+ throw new IllegalArgumentException("isDuplicated cannot accept "
+ + messageId.getClass().getName() + ": " + messageId);
+ }
+ if (lastCumulativeAck.compareTo(messageId) >= 0) {
// Already included in a cumulative ack
return true;
} else {
- return pendingIndividualAcks.contains((MessageIdImpl) messageId);
+ final MessageIdImpl messageIdImpl = (messageId instanceof
BatchMessageIdImpl)
+ ? ((BatchMessageIdImpl) messageId).toMessageIdImpl()
+ : (MessageIdImpl) messageId;
+ return pendingIndividualAcks.contains(messageIdImpl);
}
}
@@ -650,7 +656,7 @@ class LastCumulativeAck {
private boolean flushRequired = false;
public synchronized void update(final MessageIdImpl messageId, final
BitSetRecyclable bitSetRecyclable) {
- if (messageId.compareTo(this.messageId) > 0) {
+ if (compareTo(messageId) < 0) {
if (this.bitSetRecyclable != null && this.bitSetRecyclable !=
bitSetRecyclable) {
this.bitSetRecyclable.recycle();
}
@@ -684,6 +690,24 @@ class LastCumulativeAck {
flushRequired = false;
}
+ public synchronized int compareTo(MessageId messageId) {
+ if (this.messageId instanceof BatchMessageIdImpl && (!(messageId
instanceof BatchMessageIdImpl))) {
+ final BatchMessageIdImpl lhs = (BatchMessageIdImpl) this.messageId;
+ final MessageIdImpl rhs = (MessageIdImpl) messageId;
+ return MessageIdImpl.messageIdCompare(
+ lhs.getLedgerId(), lhs.getEntryId(),
lhs.getPartitionIndex(), lhs.getBatchIndex(),
+ rhs.getLedgerId(), rhs.getEntryId(),
rhs.getPartitionIndex(), Integer.MAX_VALUE);
+ } else if (messageId instanceof BatchMessageIdImpl &&
(!(this.messageId instanceof BatchMessageIdImpl))){
+ final MessageIdImpl lhs = this.messageId;
+ final BatchMessageIdImpl rhs = (BatchMessageIdImpl) messageId;
+ return MessageIdImpl.messageIdCompare(
+ lhs.getLedgerId(), lhs.getEntryId(),
lhs.getPartitionIndex(), Integer.MAX_VALUE,
+ rhs.getLedgerId(), rhs.getEntryId(),
rhs.getPartitionIndex(), rhs.getBatchIndex());
+ } else {
+ return this.messageId.compareTo(messageId);
+ }
+ }
+
private synchronized void set(final MessageIdImpl messageId, final
BitSetRecyclable bitSetRecyclable) {
this.messageId = messageId;
this.bitSetRecyclable = bitSetRecyclable;
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java
index 102ccfc0e07..5a46b855342 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java
@@ -83,4 +83,24 @@ public class LastCumulativeAckTest {
assertEquals(lastCumulativeAckToFlush.getBitSetRecyclable(),
bitSetRecyclable);
}
+ @Test
+ public void testCompareTo() {
+ LastCumulativeAck lastCumulativeAck = new LastCumulativeAck();
+ lastCumulativeAck.update(new MessageIdImpl(0L, 1L, -1), null);
+
+ assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 0L, -1))
> 0);
+ assertEquals(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 1L,
-1)), 0);
+ assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 2L, -1))
< 0);
+ assertTrue(lastCumulativeAck.compareTo(new BatchMessageIdImpl(0L, 1L,
-1, 0)) > 0);
+
+ lastCumulativeAck = new LastCumulativeAck();
+ lastCumulativeAck.update(new BatchMessageIdImpl(0L, 1L, -1, 1), null);
+
+ assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 0L, -1))
> 0);
+ assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 1L, -1))
< 0);
+ assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 2L, -1))
< 0);
+ assertTrue(lastCumulativeAck.compareTo(new BatchMessageIdImpl(0L, 1L,
-1, 0)) > 0);
+ assertTrue(lastCumulativeAck.compareTo(new BatchMessageIdImpl(0L, 1L,
-1, 2)) < 0);
+ assertEquals(lastCumulativeAck.compareTo(new BatchMessageIdImpl(0L,
1L, -1, 1)), 0);
+ }
}