This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 763f90f6dd3 [fix] Fix Reader can be stuck from transaction aborted messages. (#22610) 763f90f6dd3 is described below commit 763f90f6dd317819d93990348bfc8519029c727d Author: 道君 <dao...@apache.org> AuthorDate: Tue May 7 20:45:16 2024 +0800 [fix] Fix Reader can be stuck from transaction aborted messages. (#22610) --- .../mledger/util/ManagedLedgerImplUtils.java | 17 ++---- .../broker/service/persistent/PersistentTopic.java | 24 ++++---- .../pulsar/broker/transaction/TransactionTest.java | 68 ++++++++++++++++++++++ .../buffer/TopicTransactionBufferTest.java | 36 ++++++++---- 4 files changed, 110 insertions(+), 35 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java index cd8671b0e62..01de115290a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java @@ -38,11 +38,7 @@ public class ManagedLedgerImplUtils { final Predicate<Entry> predicate, final PositionImpl startPosition) { CompletableFuture<Position> future = new CompletableFuture<>(); - if (!ledger.isValidPosition(startPosition)) { - future.complete(startPosition); - } else { - internalAsyncReverseFindPositionOneByOne(ledger, predicate, startPosition, future); - } + internalAsyncReverseFindPositionOneByOne(ledger, predicate, startPosition, future); return future; } @@ -50,6 +46,10 @@ public class ManagedLedgerImplUtils { final Predicate<Entry> predicate, final PositionImpl position, final CompletableFuture<Position> future) { + if (!ledger.isValidPosition(position)) { + future.complete(position); + return; + } ledger.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() { @Override public void readEntryComplete(Entry entry, Object ctx) { @@ -60,12 +60,7 @@ public class ManagedLedgerImplUtils { return; } PositionImpl previousPosition = ledger.getPreviousPosition((PositionImpl) position); - if (!ledger.isValidPosition(previousPosition)) { - future.complete(previousPosition); - } else { - internalAsyncReverseFindPositionOneByOne(ledger, predicate, - ledger.getPreviousPosition((PositionImpl) position), future); - } + internalAsyncReverseFindPositionOneByOne(ledger, predicate, previousPosition, future); } catch (Exception e) { future.completeExceptionally(e); } finally { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 60eb700fc06..fa731b860f7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -3561,18 +3561,18 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal @Override public CompletableFuture<Position> getLastDispatchablePosition() { - PositionImpl maxReadPosition = getMaxReadPosition(); - // If `maxReadPosition` is not equal to `LastPosition`. It means that there are uncommitted transactions. - // so return `maxRedPosition` directly. - if (maxReadPosition.compareTo((PositionImpl) getLastPosition()) != 0) { - return CompletableFuture.completedFuture(maxReadPosition); - } else { - return ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry -> { - MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer()); - // If a messages has marker will filter by AbstractBaseDispatcher.filterEntriesForConsumer - return !Markers.isServerOnlyMarker(md); - }, maxReadPosition); - } + return ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry -> { + MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer()); + // If a messages has marker will filter by AbstractBaseDispatcher.filterEntriesForConsumer + if (Markers.isServerOnlyMarker(md)) { + return false; + } else if (md.hasTxnidMostBits() && md.hasTxnidLeastBits()) { + // Filter-out transaction aborted messages. + TxnID txnID = new TxnID(md.getTxnidMostBits(), md.getTxnidLeastBits()); + return !isTxnAborted(txnID, (PositionImpl) entry.getPosition()); + } + return true; + }, getMaxReadPosition()); } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 6546d08c9af..28dc2f8972c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -1909,4 +1909,72 @@ public class TransactionTest extends TransactionTestBase { Assert.assertEquals(result, List.of("V4", "V5", "V6")); } + @Test + public void testPersistentTopicGetLastDispatchablePositionWithTxn() throws Exception { + String topic = "persistent://" + NAMESPACE1 + "/testPersistentTopicGetLastDispatchablePositionWithTxn"; + + @Cleanup + Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .enableBatching(false) + .create(); + + BrokerService brokerService = pulsarTestContexts.get(0).getBrokerService(); + PersistentTopic persistentTopic = (PersistentTopic) brokerService.getTopicReference(topic).get(); + + + // send a normal message + String body = UUID.randomUUID().toString(); + MessageIdImpl msgId = (MessageIdImpl) producer.send(body); + + // send 3 txn messages + Transaction txn = pulsarClient.newTransaction().build().get(); + producer.newMessage(txn).value(UUID.randomUUID().toString()).send(); + producer.newMessage(txn).value(UUID.randomUUID().toString()).send(); + producer.newMessage(txn).value(UUID.randomUUID().toString()).send(); + + // get last dispatchable position + PositionImpl lastDispatchablePosition = (PositionImpl) persistentTopic.getLastDispatchablePosition().get(); + // the last dispatchable position should be the message id of the normal message + assertEquals(lastDispatchablePosition, PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId())); + + // abort the txn + txn.abort().get(5, TimeUnit.SECONDS); + + // get last dispatchable position + lastDispatchablePosition = (PositionImpl) persistentTopic.getLastDispatchablePosition().get(); + // the last dispatchable position should be the message id of the normal message + assertEquals(lastDispatchablePosition, PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId())); + + + @Cleanup + Reader<String> reader = pulsarClient.newReader(Schema.STRING) + .topic(topic) + .startMessageId(MessageId.earliest) + .create(); + Transaction txn1 = pulsarClient.newTransaction().build().get(); + producer.newMessage(txn1).value(UUID.randomUUID().toString()).send(); + producer.newMessage(txn1).value(UUID.randomUUID().toString()).send(); + producer.newMessage(txn1).value(UUID.randomUUID().toString()).send(); + List<Message<String>> messages = new ArrayList<>(); + while (reader.hasMessageAvailable()) { + messages.add(reader.readNext()); + } + assertEquals(messages.size(), 1); + assertEquals(messages.get(0).getValue(), body); + + txn1.abort().get(5, TimeUnit.SECONDS); + + @Cleanup + Reader<String> reader1 = pulsarClient.newReader(Schema.STRING) + .topic(topic) + .startMessageId(MessageId.earliest) + .create(); + List<Message<String>> messages1 = new ArrayList<>(); + while (reader1.hasMessageAvailable()) { + messages1.add(reader1.readNext()); + } + assertEquals(messages1.size(), 1); + assertEquals(messages1.get(0).getValue(), body); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index b0903b00be3..f93cfbcdc50 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -280,9 +280,9 @@ public class TopicTransactionBufferTest extends TransactionTestBase { for (int i = 0; i < 3; i++) { expectedLastMessageID = (MessageIdImpl) producer.newMessage().send(); } - assertMessageId(consumer, expectedLastMessageID); + assertGetLastMessageId(consumer, expectedLastMessageID); // 2.2 Case2: send 2 ongoing transactional messages and 2 original messages. - // |1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5|1:6|. + // |1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5. Transaction txn1 = pulsarClient.newTransaction() .withTransactionTimeout(5, TimeUnit.HOURS) .build() @@ -291,25 +291,37 @@ public class TopicTransactionBufferTest extends TransactionTestBase { .withTransactionTimeout(5, TimeUnit.HOURS) .build() .get(); + + // |1:0|1:1|1:2|txn1:1:3| producer.newMessage(txn1).send(); - // expectedLastMessageID1 == 1:4 + + // |1:0|1:1|1:2|txn1:1:3|1:4| MessageIdImpl expectedLastMessageID1 = (MessageIdImpl) producer.newMessage().send(); + + // |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5| producer.newMessage(txn2).send(); - // expectedLastMessageID2 == 1:6 - MessageIdImpl expectedLastMessageID2 = (MessageIdImpl) producer.newMessage().send(); // 2.2.1 Last message ID will not change when txn1 and txn2 do not end. - assertMessageId(consumer, expectedLastMessageID); + assertGetLastMessageId(consumer, expectedLastMessageID); // 2.2.2 Last message ID will update to 1:4 when txn1 committed. - // |1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5|1:6|tx1:commit->1:7| + // |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|tx1:commit->1:6| txn1.commit().get(5, TimeUnit.SECONDS); - assertMessageId(consumer, expectedLastMessageID1); + assertGetLastMessageId(consumer, expectedLastMessageID1); - // 2.2.3 Last message ID will update to 1:6 when txn2 aborted. - // |1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5|1:6|tx1:commit->1:7|tx2:abort->1:8| + // 2.2.3 Last message ID will still to 1:4 when txn2 aborted. + // |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|tx1:commit->1:6|tx2:abort->1:7| txn2.abort().get(5, TimeUnit.SECONDS); - assertMessageId(consumer, expectedLastMessageID2); + assertGetLastMessageId(consumer, expectedLastMessageID1); + + // Handle the case of the maxReadPosition < lastPosition, but it's an aborted transactional message. + Transaction txn3 = pulsarClient.newTransaction() + .build() + .get(); + producer.newMessage(txn3).send(); + assertGetLastMessageId(consumer, expectedLastMessageID1); + txn3.abort().get(5, TimeUnit.SECONDS); + assertGetLastMessageId(consumer, expectedLastMessageID1); } /** @@ -368,7 +380,7 @@ public class TopicTransactionBufferTest extends TransactionTestBase { }); } - private void assertMessageId(Consumer<?> consumer, MessageIdImpl expected) throws Exception { + private void assertGetLastMessageId(Consumer<?> consumer, MessageIdImpl expected) throws Exception { TopicMessageIdImpl actual = (TopicMessageIdImpl) consumer.getLastMessageIds().get(0); assertEquals(expected.getEntryId(), actual.getEntryId()); assertEquals(expected.getLedgerId(), actual.getLedgerId());