This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e81a20d667a [fix][broker] Avoid consumers receiving acknowledged
messages from compacted topic after reconnection (#21187)
e81a20d667a is described below
commit e81a20d667aef7c0f888e88dbcf972196012ebea
Author: Cong Zhao <[email protected]>
AuthorDate: Fri Jan 26 17:26:45 2024 +0800
[fix][broker] Avoid consumers receiving acknowledged messages from
compacted topic after reconnection (#21187)
---
.../apache/bookkeeper/mledger/ManagedCursor.java | 4 +
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 10 +-
.../PersistentDispatcherSingleActiveConsumer.java | 24 ++-
.../broker/service/persistent/PersistentTopic.java | 5 +-
.../pulsar/compaction/CompactedTopicImpl.java | 6 +-
.../broker/service/ReplicatorSubscriptionTest.java | 2 +
.../pulsar/broker/transaction/TransactionTest.java | 1 +
.../org/apache/pulsar/client/impl/ReaderTest.java | 28 ++++
.../apache/pulsar/compaction/CompactionTest.java | 164 ++++++++++++++++++++-
9 files changed, 229 insertions(+), 15 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index 68f1840afd8..8372592c851 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -517,6 +517,10 @@ public interface ManagedCursor {
*/
void rewind();
+ default void rewind(boolean readCompacted) {
+ rewind();
+ }
+
/**
* Move the cursor to a different read position.
*
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 03f0b24a635..8555753d98b 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -682,7 +682,7 @@ public class ManagedCursorImpl implements ManagedCursor {
LedgerHandle recoveredFromCursorLedger) {
// if the position was at a ledger that didn't exist (since it will be
deleted if it was previously empty),
// we need to move to the next existing ledger
- if (!ledger.ledgerExists(position.getLedgerId())) {
+ if (position.getEntryId() == -1L &&
!ledger.ledgerExists(position.getLedgerId())) {
Long nextExistingLedger =
ledger.getNextValidLedger(position.getLedgerId());
if (nextExistingLedger == null) {
log.info("[{}] [{}] Couldn't find next next valid ledger for
recovery {}", ledger.getName(), name,
@@ -2522,9 +2522,15 @@ public class ManagedCursorImpl implements ManagedCursor {
@Override
public void rewind() {
+ rewind(false);
+ }
+
+ @Override
+ public void rewind(boolean readCompacted) {
lock.writeLock().lock();
try {
- PositionImpl newReadPosition =
ledger.getNextValidPosition(markDeletePosition);
+ PositionImpl newReadPosition =
+ readCompacted ? markDeletePosition.getNext() :
ledger.getNextValidPosition(markDeletePosition);
PositionImpl oldReadPosition = readPosition;
log.info("[{}-{}] Rewind from {} to {}", ledger.getName(), name,
oldReadPosition, newReadPosition);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 3fac65a3ce1..0f43eb6c5cc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -54,6 +54,7 @@ import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.compaction.CompactedTopicUtils;
+import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.TopicCompactionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -107,9 +108,9 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
if (log.isDebugEnabled()) {
log.debug("[{}] Rewind cursor and read more entries without
delay", name);
}
- cursor.rewind();
-
Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+ cursor.rewind(activeConsumer != null &&
activeConsumer.readCompacted());
+
notifyActiveConsumerChanged(activeConsumer);
readMoreEntries(activeConsumer);
return;
@@ -127,9 +128,9 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
log.debug("[{}] Rewind cursor and read more entries after {}
ms delay", name,
serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
}
- cursor.rewind();
-
Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+ cursor.rewind(activeConsumer != null &&
activeConsumer.readCompacted());
+
notifyActiveConsumerChanged(activeConsumer);
readMoreEntries(activeConsumer);
readOnActiveConsumerTask = null;
@@ -206,7 +207,7 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
}
}
entries.forEach(Entry::release);
- cursor.rewind();
+ cursor.rewind(currentConsumer != null ?
currentConsumer.readCompacted() : readConsumer.readCompacted());
if (currentConsumer != null) {
notifyActiveConsumerChanged(currentConsumer);
readMoreEntries(currentConsumer);
@@ -301,7 +302,7 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
}
cursor.cancelPendingReadRequest();
havePendingRead = false;
- cursor.rewind();
+ cursor.rewind(consumer.readCompacted());
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Cursor rewinded, redelivering unacknowledged
messages. ", name, consumer);
}
@@ -362,7 +363,9 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
}
havePendingRead = true;
if (consumer.readCompacted()) {
- boolean readFromEarliest = isFirstRead &&
MessageId.earliest.equals(consumer.getStartMessageId());
+ boolean readFromEarliest = isFirstRead &&
MessageId.earliest.equals(consumer.getStartMessageId())
+ && (!cursor.isDurable() ||
cursor.getName().equals(Compactor.COMPACTION_SUBSCRIPTION)
+ || hasValidMarkDeletePosition(cursor));
TopicCompactionService topicCompactionService =
topic.getTopicCompactionService();
CompactedTopicUtils.asyncReadCompactedEntries(topicCompactionService, cursor,
messagesToRead,
bytesToRead, topic.getMaxReadPosition(),
readFromEarliest, this, true, consumer);
@@ -380,6 +383,13 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
}
}
+ private boolean hasValidMarkDeletePosition(ManagedCursor cursor) {
+ // If `markDeletedPosition.entryID == -1L` then the md-position is an
invalid position,
+ // since the initial md-position of the consumer will be set to it.
+ // See ManagedLedgerImpl#asyncOpenCursor and
ManagedLedgerImpl#getFirstPosition
+ return cursor.getMarkDeletedPosition() != null &&
cursor.getMarkDeletedPosition().getEntryId() == -1L;
+ }
+
@Override
protected void reScheduleRead() {
if (isRescheduleReadInProgress.compareAndSet(false, true)) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index a375ebf2809..f045492e67b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1035,7 +1035,9 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
private CompletableFuture<Subscription> getDurableSubscription(String
subscriptionName,
- InitialPosition initialPosition, long
startMessageRollbackDurationSec, boolean replicated,
+
InitialPosition initialPosition,
+ long
startMessageRollbackDurationSec,
+ boolean
replicated,
Map<String,
String> subscriptionProperties) {
CompletableFuture<Subscription> subscriptionFuture = new
CompletableFuture<>();
if (checkMaxSubscriptionsPerTopicExceed(subscriptionName)) {
@@ -1045,7 +1047,6 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
Map<String, Long> properties =
PersistentSubscription.getBaseCursorProperties(replicated);
-
ledger.asyncOpenCursor(Codec.encode(subscriptionName),
initialPosition, properties, subscriptionProperties,
new OpenCursorCallback() {
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index d13ce61753d..dfafbc41cb4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -101,7 +101,11 @@ public class CompactedTopicImpl implements CompactedTopic {
boolean isFirstRead,
ReadEntriesCallback callback, Consumer
consumer) {
PositionImpl cursorPosition;
- if (isFirstRead &&
MessageId.earliest.equals(consumer.getStartMessageId())){
+ boolean readFromEarliest = isFirstRead &&
MessageId.earliest.equals(consumer.getStartMessageId())
+ && (!cursor.isDurable() ||
cursor.getName().equals(Compactor.COMPACTION_SUBSCRIPTION)
+ || cursor.getMarkDeletedPosition() == null
+ || cursor.getMarkDeletedPosition().getEntryId() == -1L);
+ if (readFromEarliest){
cursorPosition = PositionImpl.EARLIEST;
} else {
cursorPosition = (PositionImpl) cursor.getReadPosition();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
index fe519827be7..4cc3a9ada7d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
@@ -52,6 +52,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
@@ -868,6 +869,7 @@ public class ReplicatorSubscriptionTest extends
ReplicatorTestBase {
.topic(topicName)
.subscriptionName("sub2")
.subscriptionType(SubscriptionType.Exclusive)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.readCompacted(true)
.subscribe();
List<String> result = new ArrayList<>();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index e67049451f3..81ed4311374 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -1893,6 +1893,7 @@ public class TransactionTest extends TransactionTestBase {
.topic(topic)
.subscriptionName("sub")
.subscriptionType(SubscriptionType.Exclusive)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.readCompacted(true)
.subscribe();
List<String> result = new ArrayList<>();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index 64a5da43d44..4e4dc8273d3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -785,4 +785,32 @@ public class ReaderTest extends
MockedPulsarServiceBaseTest {
admin.topics().deletePartitionedTopic(partitionedTopic);
}
+ @Test
+ public void testReaderReconnectedFromNextEntry() throws Exception {
+ final String topic =
"persistent://my-property/my-ns/testReaderReconnectedFromNextEntry";
+ Reader<String> reader =
pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1)
+ .startMessageId(MessageId.earliest).create();
+ Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+
+ // Send 3 and consume 1.
+ producer.send("1");
+ producer.send("2");
+ producer.send("3");
+ Message<String> msg1 = reader.readNext(2, TimeUnit.SECONDS);
+ assertEquals(msg1.getValue(), "1");
+
+ // Trigger reader reconnect.
+ admin.topics().unload(topic);
+
+ // For non-durable we are going to restart from the next entry.
+ Message<String> msg2 = reader.readNext(2, TimeUnit.SECONDS);
+ assertEquals(msg2.getValue(), "2");
+ Message<String> msg3 = reader.readNext(2, TimeUnit.SECONDS);
+ assertEquals(msg3.getValue(), "3");
+
+ // cleanup.
+ reader.close();
+ producer.close();
+ admin.topics().delete(topic, false);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 698ab15940b..f0010096b1e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -1875,6 +1875,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
ConsumerImpl<String> consumer = (ConsumerImpl<String>)
client.newConsumer(Schema.STRING)
.topic(topicName).readCompacted(true).receiverQueueSize(receiveQueueSize).subscriptionName(subName)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
//Give some time to consume
@@ -1918,6 +1919,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>)
client.newConsumer(Schema.BYTES)
.topic(topicName).readCompacted(true).receiverQueueSize(receiveQueueSize).subscriptionName(subName)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
Awaitility.await().untilAsserted(() -> {
@@ -2190,9 +2192,11 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
});
@Cleanup
- Consumer<String> consumer =
-
pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName).readCompacted(true)
- .subscribe();
+ Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING).topic(topicName)
+ .subscriptionName("sub-2")
+ .readCompacted(true)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
List<String> result = new ArrayList<>();
while (true) {
@@ -2206,4 +2210,158 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
Assert.assertEquals(result, List.of("V3", "V4", "V5"));
}
+
+ @Test
+ public void testAcknowledgeWithReconnection() throws Exception {
+ final String topicName =
"persistent://my-property/use/my-ns/testAcknowledge" + UUID.randomUUID();
+ final String subName = "my-sub";
+ @Cleanup
+ PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .enableBatching(false).topic(topicName).create();
+
+ List<String> expected = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+
producer.newMessage().key(String.valueOf(i)).value(String.valueOf(i)).send();
+ expected.add(String.valueOf(i));
+ }
+ producer.flush();
+
+ admin.topics().triggerCompaction(topicName);
+
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(admin.topics().compactionStatus(topicName).status,
+ LongRunningProcessStatus.Status.SUCCESS);
+ });
+
+ // trim the topic
+ admin.topics().unload(topicName);
+
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopicInternalStats internalStats =
admin.topics().getInternalStats(topicName, false);
+ assertEquals(internalStats.numberOfEntries, 0);
+ });
+
+ ConsumerImpl<String> consumer = (ConsumerImpl<String>)
client.newConsumer(Schema.STRING)
+
.topic(topicName).readCompacted(true).receiverQueueSize(1).subscriptionName(subName)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .isAckReceiptEnabled(true)
+ .subscribe();
+
+ List<String> results = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
+ if (message == null) {
+ break;
+ }
+ results.add(message.getValue());
+ consumer.acknowledge(message);
+ }
+
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(admin.topics().getStats(topicName,
true).getSubscriptions().get(subName).getMsgBacklog(),
+ 5));
+
+ // Make consumer reconnect to broker
+ admin.topics().unload(topicName);
+
+ // Wait for consumer to reconnect and clear incomingMessages
+ consumer.pause();
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertEquals(consumer.numMessagesInQueue(), 0);
+ });
+ consumer.resume();
+
+ for (int i = 0; i < 5; i++) {
+ Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
+ if (message == null) {
+ break;
+ }
+ results.add(message.getValue());
+ consumer.acknowledge(message);
+ }
+
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(admin.topics().getStats(topicName,
true).getSubscriptions().get(subName).getMsgBacklog(),
+ 0));
+
+ Assert.assertEquals(results, expected);
+
+ Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
+ Assert.assertNull(message);
+
+ // Make consumer reconnect to broker
+ admin.topics().unload(topicName);
+
+ producer.newMessage().key("K").value("V").send();
+ Message<String> message2 = consumer.receive(3, TimeUnit.SECONDS);
+ Assert.assertEquals(message2.getValue(), "V");
+ consumer.acknowledge(message2);
+
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopicInternalStats internalStats =
admin.topics().getInternalStats(topicName);
+ Assert.assertEquals(internalStats.lastConfirmedEntry,
+ internalStats.cursors.get(subName).markDeletePosition);
+ });
+
+ consumer.close();
+ producer.close();
+ }
+
+ @Test
+ public void testEarliestSubsAfterRollover() throws Exception {
+ final String topicName =
"persistent://my-property/use/my-ns/testEarliestSubsAfterRollover" +
UUID.randomUUID();
+ final String subName = "my-sub";
+ @Cleanup
+ PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .enableBatching(false).topic(topicName).create();
+
+ List<String> expected = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+
producer.newMessage().key(String.valueOf(i)).value(String.valueOf(i)).send();
+ expected.add(String.valueOf(i));
+ }
+ producer.flush();
+
+ admin.topics().triggerCompaction(topicName);
+
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(admin.topics().compactionStatus(topicName).status,
+ LongRunningProcessStatus.Status.SUCCESS);
+ });
+
+ // trim the topic
+ admin.topics().unload(topicName);
+
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopicInternalStats internalStats =
admin.topics().getInternalStats(topicName, false);
+ assertEquals(internalStats.numberOfEntries, 0);
+ });
+
+ // Make ml.getFirstPosition() return new ledger first position
+ producer.newMessage().key("K").value("V").send();
+ expected.add("V");
+
+ @Cleanup
+ ConsumerImpl<String> consumer = (ConsumerImpl<String>)
client.newConsumer(Schema.STRING)
+
.topic(topicName).readCompacted(true).receiverQueueSize(1).subscriptionName(subName)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .isAckReceiptEnabled(true)
+ .subscribe();
+
+ List<String> results = new ArrayList<>();
+ while (true) {
+ Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
+ if (message == null) {
+ break;
+ }
+
+ results.add(message.getValue());
+ consumer.acknowledge(message);
+ }
+
+ Assert.assertEquals(results, expected);
+ }
}