This is an automated email from the ASF dual-hosted git repository.
daojun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7e88463d9a5 [fix] Fix Reader can be stuck from transaction aborted
messages. (#22610)
7e88463d9a5 is described below
commit 7e88463d9a598f95725bee49fd7f713bce27cf28
Author: 道君 <[email protected]>
AuthorDate: Tue May 7 20:45:16 2024 +0800
[fix] Fix Reader can be stuck from transaction aborted messages. (#22610)
---
.../mledger/util/ManagedLedgerImplUtils.java | 17 ++----
.../broker/service/persistent/PersistentTopic.java | 24 ++++----
.../pulsar/broker/transaction/TransactionTest.java | 69 ++++++++++++++++++++++
.../buffer/TopicTransactionBufferTest.java | 36 +++++++----
4 files changed, 111 insertions(+), 35 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java
index cd8671b0e62..01de115290a 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java
@@ -38,11 +38,7 @@ public class ManagedLedgerImplUtils {
final
Predicate<Entry> predicate,
final
PositionImpl startPosition) {
CompletableFuture<Position> future = new CompletableFuture<>();
- if (!ledger.isValidPosition(startPosition)) {
- future.complete(startPosition);
- } else {
- internalAsyncReverseFindPositionOneByOne(ledger, predicate,
startPosition, future);
- }
+ internalAsyncReverseFindPositionOneByOne(ledger, predicate,
startPosition, future);
return future;
}
@@ -50,6 +46,10 @@ public class ManagedLedgerImplUtils {
final
Predicate<Entry> predicate,
final
PositionImpl position,
final
CompletableFuture<Position> future) {
+ if (!ledger.isValidPosition(position)) {
+ future.complete(position);
+ return;
+ }
ledger.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback()
{
@Override
public void readEntryComplete(Entry entry, Object ctx) {
@@ -60,12 +60,7 @@ public class ManagedLedgerImplUtils {
return;
}
PositionImpl previousPosition =
ledger.getPreviousPosition((PositionImpl) position);
- if (!ledger.isValidPosition(previousPosition)) {
- future.complete(previousPosition);
- } else {
- internalAsyncReverseFindPositionOneByOne(ledger,
predicate,
- ledger.getPreviousPosition((PositionImpl)
position), future);
- }
+ internalAsyncReverseFindPositionOneByOne(ledger,
predicate, previousPosition, future);
} catch (Exception e) {
future.completeExceptionally(e);
} finally {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index e9ed8aa6edf..58ea8088dba 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3768,18 +3768,18 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
@Override
public CompletableFuture<Position> getLastDispatchablePosition() {
- PositionImpl maxReadPosition = getMaxReadPosition();
- // If `maxReadPosition` is not equal to `LastPosition`. It means that
there are uncommitted transactions.
- // so return `maxRedPosition` directly.
- if (maxReadPosition.compareTo((PositionImpl) getLastPosition()) != 0) {
- return CompletableFuture.completedFuture(maxReadPosition);
- } else {
- return
ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger,
entry -> {
- MessageMetadata md =
Commands.parseMessageMetadata(entry.getDataBuffer());
- // If a messages has marker will filter by
AbstractBaseDispatcher.filterEntriesForConsumer
- return !Markers.isServerOnlyMarker(md);
- }, maxReadPosition);
- }
+ return
ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger,
entry -> {
+ MessageMetadata md =
Commands.parseMessageMetadata(entry.getDataBuffer());
+ // If a messages has marker will filter by
AbstractBaseDispatcher.filterEntriesForConsumer
+ if (Markers.isServerOnlyMarker(md)) {
+ return false;
+ } else if (md.hasTxnidMostBits() && md.hasTxnidLeastBits()) {
+ // Filter-out transaction aborted messages.
+ TxnID txnID = new TxnID(md.getTxnidMostBits(),
md.getTxnidLeastBits());
+ return !isTxnAborted(txnID, (PositionImpl)
entry.getPosition());
+ }
+ return true;
+ }, getMaxReadPosition());
}
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index ed1b74c46e0..e8c15d193a2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -1978,4 +1978,73 @@ public class TransactionTest extends TransactionTestBase
{
+ maxDeliveryDelayInMillis + " milliseconds");
}
}
+
+ @Test
+ public void testPersistentTopicGetLastDispatchablePositionWithTxn() throws
Exception {
+ String topic = "persistent://" + NAMESPACE1 +
"/testPersistentTopicGetLastDispatchablePositionWithTxn";
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .enableBatching(false)
+ .create();
+
+ BrokerService brokerService =
pulsarTestContexts.get(0).getBrokerService();
+ PersistentTopic persistentTopic = (PersistentTopic)
brokerService.getTopicReference(topic).get();
+
+
+ // send a normal message
+ String body = UUID.randomUUID().toString();
+ MessageIdImpl msgId = (MessageIdImpl) producer.send(body);
+
+ // send 3 txn messages
+ Transaction txn = pulsarClient.newTransaction().build().get();
+ producer.newMessage(txn).value(UUID.randomUUID().toString()).send();
+ producer.newMessage(txn).value(UUID.randomUUID().toString()).send();
+ producer.newMessage(txn).value(UUID.randomUUID().toString()).send();
+
+ // get last dispatchable position
+ PositionImpl lastDispatchablePosition = (PositionImpl)
persistentTopic.getLastDispatchablePosition().get();
+ // the last dispatchable position should be the message id of the
normal message
+ assertEquals(lastDispatchablePosition,
PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId()));
+
+ // abort the txn
+ txn.abort().get(5, TimeUnit.SECONDS);
+
+ // get last dispatchable position
+ lastDispatchablePosition = (PositionImpl)
persistentTopic.getLastDispatchablePosition().get();
+ // the last dispatchable position should be the message id of the
normal message
+ assertEquals(lastDispatchablePosition,
PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId()));
+
+
+ @Cleanup
+ Reader<String> reader = pulsarClient.newReader(Schema.STRING)
+ .topic(topic)
+ .startMessageId(MessageId.earliest)
+ .create();
+ Transaction txn1 = pulsarClient.newTransaction().build().get();
+ producer.newMessage(txn1).value(UUID.randomUUID().toString()).send();
+ producer.newMessage(txn1).value(UUID.randomUUID().toString()).send();
+ producer.newMessage(txn1).value(UUID.randomUUID().toString()).send();
+ List<Message<String>> messages = new ArrayList<>();
+ while (reader.hasMessageAvailable()) {
+ messages.add(reader.readNext());
+ }
+ assertEquals(messages.size(), 1);
+ assertEquals(messages.get(0).getValue(), body);
+
+ txn1.abort().get(5, TimeUnit.SECONDS);
+
+ @Cleanup
+ Reader<String> reader1 = pulsarClient.newReader(Schema.STRING)
+ .topic(topic)
+ .startMessageId(MessageId.earliest)
+ .create();
+ List<Message<String>> messages1 = new ArrayList<>();
+ while (reader1.hasMessageAvailable()) {
+ messages1.add(reader1.readNext());
+ }
+ assertEquals(messages1.size(), 1);
+ assertEquals(messages1.get(0).getValue(), body);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
index b0903b00be3..f93cfbcdc50 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
@@ -280,9 +280,9 @@ public class TopicTransactionBufferTest extends
TransactionTestBase {
for (int i = 0; i < 3; i++) {
expectedLastMessageID = (MessageIdImpl)
producer.newMessage().send();
}
- assertMessageId(consumer, expectedLastMessageID);
+ assertGetLastMessageId(consumer, expectedLastMessageID);
// 2.2 Case2: send 2 ongoing transactional messages and 2 original
messages.
- // |1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5|1:6|.
+ // |1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5.
Transaction txn1 = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.HOURS)
.build()
@@ -291,25 +291,37 @@ public class TopicTransactionBufferTest extends
TransactionTestBase {
.withTransactionTimeout(5, TimeUnit.HOURS)
.build()
.get();
+
+ // |1:0|1:1|1:2|txn1:1:3|
producer.newMessage(txn1).send();
- // expectedLastMessageID1 == 1:4
+
+ // |1:0|1:1|1:2|txn1:1:3|1:4|
MessageIdImpl expectedLastMessageID1 = (MessageIdImpl)
producer.newMessage().send();
+
+ // |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|
producer.newMessage(txn2).send();
- // expectedLastMessageID2 == 1:6
- MessageIdImpl expectedLastMessageID2 = (MessageIdImpl)
producer.newMessage().send();
// 2.2.1 Last message ID will not change when txn1 and txn2 do not end.
- assertMessageId(consumer, expectedLastMessageID);
+ assertGetLastMessageId(consumer, expectedLastMessageID);
// 2.2.2 Last message ID will update to 1:4 when txn1 committed.
- //
|1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5|1:6|tx1:commit->1:7|
+ // |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|tx1:commit->1:6|
txn1.commit().get(5, TimeUnit.SECONDS);
- assertMessageId(consumer, expectedLastMessageID1);
+ assertGetLastMessageId(consumer, expectedLastMessageID1);
- // 2.2.3 Last message ID will update to 1:6 when txn2 aborted.
- //
|1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5|1:6|tx1:commit->1:7|tx2:abort->1:8|
+ // 2.2.3 Last message ID will still to 1:4 when txn2 aborted.
+ // |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|tx1:commit->1:6|tx2:abort->1:7|
txn2.abort().get(5, TimeUnit.SECONDS);
- assertMessageId(consumer, expectedLastMessageID2);
+ assertGetLastMessageId(consumer, expectedLastMessageID1);
+
+ // Handle the case of the maxReadPosition < lastPosition, but it's an
aborted transactional message.
+ Transaction txn3 = pulsarClient.newTransaction()
+ .build()
+ .get();
+ producer.newMessage(txn3).send();
+ assertGetLastMessageId(consumer, expectedLastMessageID1);
+ txn3.abort().get(5, TimeUnit.SECONDS);
+ assertGetLastMessageId(consumer, expectedLastMessageID1);
}
/**
@@ -368,7 +380,7 @@ public class TopicTransactionBufferTest extends
TransactionTestBase {
});
}
- private void assertMessageId(Consumer<?> consumer, MessageIdImpl expected)
throws Exception {
+ private void assertGetLastMessageId(Consumer<?> consumer, MessageIdImpl
expected) throws Exception {
TopicMessageIdImpl actual = (TopicMessageIdImpl)
consumer.getLastMessageIds().get(0);
assertEquals(expected.getEntryId(), actual.getEntryId());
assertEquals(expected.getLedgerId(), actual.getLedgerId());