This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 763f90f6dd3 [fix] Fix Reader can be stuck from transaction aborted 
messages. (#22610)
763f90f6dd3 is described below

commit 763f90f6dd317819d93990348bfc8519029c727d
Author: 道君 <dao...@apache.org>
AuthorDate: Tue May 7 20:45:16 2024 +0800

    [fix] Fix Reader can be stuck from transaction aborted messages. (#22610)
---
 .../mledger/util/ManagedLedgerImplUtils.java       | 17 ++----
 .../broker/service/persistent/PersistentTopic.java | 24 ++++----
 .../pulsar/broker/transaction/TransactionTest.java | 68 ++++++++++++++++++++++
 .../buffer/TopicTransactionBufferTest.java         | 36 ++++++++----
 4 files changed, 110 insertions(+), 35 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java
index cd8671b0e62..01de115290a 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java
@@ -38,11 +38,7 @@ public class ManagedLedgerImplUtils {
                                                                         final 
Predicate<Entry> predicate,
                                                                         final 
PositionImpl startPosition) {
         CompletableFuture<Position> future = new CompletableFuture<>();
-        if (!ledger.isValidPosition(startPosition)) {
-            future.complete(startPosition);
-        } else {
-            internalAsyncReverseFindPositionOneByOne(ledger, predicate, 
startPosition, future);
-        }
+        internalAsyncReverseFindPositionOneByOne(ledger, predicate, 
startPosition, future);
         return future;
     }
 
@@ -50,6 +46,10 @@ public class ManagedLedgerImplUtils {
                                                                  final 
Predicate<Entry> predicate,
                                                                  final 
PositionImpl position,
                                                                  final 
CompletableFuture<Position> future) {
+        if (!ledger.isValidPosition(position)) {
+            future.complete(position);
+            return;
+        }
         ledger.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() 
{
             @Override
             public void readEntryComplete(Entry entry, Object ctx) {
@@ -60,12 +60,7 @@ public class ManagedLedgerImplUtils {
                         return;
                     }
                     PositionImpl previousPosition = 
ledger.getPreviousPosition((PositionImpl) position);
-                    if (!ledger.isValidPosition(previousPosition)) {
-                        future.complete(previousPosition);
-                    } else {
-                        internalAsyncReverseFindPositionOneByOne(ledger, 
predicate,
-                                ledger.getPreviousPosition((PositionImpl) 
position), future);
-                    }
+                    internalAsyncReverseFindPositionOneByOne(ledger, 
predicate, previousPosition, future);
                 } catch (Exception e) {
                     future.completeExceptionally(e);
                 } finally {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 60eb700fc06..fa731b860f7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3561,18 +3561,18 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
     @Override
     public CompletableFuture<Position> getLastDispatchablePosition() {
-        PositionImpl maxReadPosition = getMaxReadPosition();
-        // If `maxReadPosition` is not equal to `LastPosition`. It means that 
there are uncommitted transactions.
-        // so return `maxRedPosition` directly.
-        if (maxReadPosition.compareTo((PositionImpl) getLastPosition()) != 0) {
-            return CompletableFuture.completedFuture(maxReadPosition);
-        } else {
-            return 
ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, 
entry -> {
-                MessageMetadata md = 
Commands.parseMessageMetadata(entry.getDataBuffer());
-                // If a messages has marker will filter by 
AbstractBaseDispatcher.filterEntriesForConsumer
-                return !Markers.isServerOnlyMarker(md);
-            }, maxReadPosition);
-        }
+        return 
ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, 
entry -> {
+            MessageMetadata md = 
Commands.parseMessageMetadata(entry.getDataBuffer());
+            // If a messages has marker will filter by 
AbstractBaseDispatcher.filterEntriesForConsumer
+            if (Markers.isServerOnlyMarker(md)) {
+                return false;
+            } else if (md.hasTxnidMostBits() && md.hasTxnidLeastBits()) {
+                // Filter-out transaction aborted messages.
+                TxnID txnID = new TxnID(md.getTxnidMostBits(), 
md.getTxnidLeastBits());
+                return !isTxnAborted(txnID, (PositionImpl) 
entry.getPosition());
+            }
+            return true;
+        }, getMaxReadPosition());
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 6546d08c9af..28dc2f8972c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -1909,4 +1909,72 @@ public class TransactionTest extends TransactionTestBase 
{
         Assert.assertEquals(result, List.of("V4", "V5", "V6"));
     }
 
+    @Test
+    public void testPersistentTopicGetLastDispatchablePositionWithTxn() throws 
Exception {
+        String topic = "persistent://" + NAMESPACE1 + 
"/testPersistentTopicGetLastDispatchablePositionWithTxn";
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+
+        BrokerService brokerService = 
pulsarTestContexts.get(0).getBrokerService();
+        PersistentTopic persistentTopic = (PersistentTopic) 
brokerService.getTopicReference(topic).get();
+
+
+        // send a normal message
+        String body = UUID.randomUUID().toString();
+        MessageIdImpl msgId = (MessageIdImpl) producer.send(body);
+
+        // send 3 txn messages
+        Transaction txn = pulsarClient.newTransaction().build().get();
+        producer.newMessage(txn).value(UUID.randomUUID().toString()).send();
+        producer.newMessage(txn).value(UUID.randomUUID().toString()).send();
+        producer.newMessage(txn).value(UUID.randomUUID().toString()).send();
+
+        // get last dispatchable position
+        PositionImpl lastDispatchablePosition = (PositionImpl) 
persistentTopic.getLastDispatchablePosition().get();
+        // the last dispatchable position should be the message id of the 
normal message
+        assertEquals(lastDispatchablePosition, 
PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId()));
+
+        // abort the txn
+        txn.abort().get(5, TimeUnit.SECONDS);
+
+        // get last dispatchable position
+        lastDispatchablePosition = (PositionImpl) 
persistentTopic.getLastDispatchablePosition().get();
+        // the last dispatchable position should be the message id of the 
normal message
+        assertEquals(lastDispatchablePosition, 
PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId()));
+
+
+        @Cleanup
+        Reader<String> reader = pulsarClient.newReader(Schema.STRING)
+                .topic(topic)
+                .startMessageId(MessageId.earliest)
+                .create();
+        Transaction txn1 = pulsarClient.newTransaction().build().get();
+        producer.newMessage(txn1).value(UUID.randomUUID().toString()).send();
+        producer.newMessage(txn1).value(UUID.randomUUID().toString()).send();
+        producer.newMessage(txn1).value(UUID.randomUUID().toString()).send();
+        List<Message<String>> messages = new ArrayList<>();
+        while (reader.hasMessageAvailable()) {
+            messages.add(reader.readNext());
+        }
+        assertEquals(messages.size(), 1);
+        assertEquals(messages.get(0).getValue(), body);
+
+        txn1.abort().get(5, TimeUnit.SECONDS);
+
+        @Cleanup
+        Reader<String> reader1 = pulsarClient.newReader(Schema.STRING)
+                .topic(topic)
+                .startMessageId(MessageId.earliest)
+                .create();
+        List<Message<String>> messages1 = new ArrayList<>();
+        while (reader1.hasMessageAvailable()) {
+            messages1.add(reader1.readNext());
+        }
+        assertEquals(messages1.size(), 1);
+        assertEquals(messages1.get(0).getValue(), body);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
index b0903b00be3..f93cfbcdc50 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
@@ -280,9 +280,9 @@ public class TopicTransactionBufferTest extends 
TransactionTestBase {
         for (int i = 0; i < 3; i++) {
             expectedLastMessageID = (MessageIdImpl) 
producer.newMessage().send();
         }
-        assertMessageId(consumer, expectedLastMessageID);
+        assertGetLastMessageId(consumer, expectedLastMessageID);
         // 2.2 Case2: send 2 ongoing transactional messages and 2 original 
messages.
-        // |1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5|1:6|.
+        // |1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5.
         Transaction txn1 = pulsarClient.newTransaction()
                 .withTransactionTimeout(5, TimeUnit.HOURS)
                 .build()
@@ -291,25 +291,37 @@ public class TopicTransactionBufferTest extends 
TransactionTestBase {
                 .withTransactionTimeout(5, TimeUnit.HOURS)
                 .build()
                 .get();
+
+        // |1:0|1:1|1:2|txn1:1:3|
         producer.newMessage(txn1).send();
-        // expectedLastMessageID1 == 1:4
+
+        // |1:0|1:1|1:2|txn1:1:3|1:4|
         MessageIdImpl expectedLastMessageID1 = (MessageIdImpl) 
producer.newMessage().send();
+
+        // |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|
         producer.newMessage(txn2).send();
-        // expectedLastMessageID2 == 1:6
-        MessageIdImpl expectedLastMessageID2 = (MessageIdImpl) 
producer.newMessage().send();
 
         // 2.2.1 Last message ID will not change when txn1 and txn2 do not end.
-        assertMessageId(consumer, expectedLastMessageID);
+        assertGetLastMessageId(consumer, expectedLastMessageID);
 
         // 2.2.2 Last message ID will update to 1:4 when txn1 committed.
-        // 
|1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5|1:6|tx1:commit->1:7|
+        // |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|tx1:commit->1:6|
         txn1.commit().get(5, TimeUnit.SECONDS);
-        assertMessageId(consumer, expectedLastMessageID1);
+        assertGetLastMessageId(consumer, expectedLastMessageID1);
 
-        // 2.2.3 Last message ID will update to 1:6 when txn2 aborted.
-        // 
|1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5|1:6|tx1:commit->1:7|tx2:abort->1:8|
+        // 2.2.3 Last message ID will still to 1:4 when txn2 aborted.
+        // |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|tx1:commit->1:6|tx2:abort->1:7|
         txn2.abort().get(5, TimeUnit.SECONDS);
-        assertMessageId(consumer, expectedLastMessageID2);
+        assertGetLastMessageId(consumer, expectedLastMessageID1);
+
+        // Handle the case of the maxReadPosition < lastPosition, but it's an 
aborted transactional message.
+        Transaction txn3 = pulsarClient.newTransaction()
+                .build()
+                .get();
+        producer.newMessage(txn3).send();
+        assertGetLastMessageId(consumer, expectedLastMessageID1);
+        txn3.abort().get(5, TimeUnit.SECONDS);
+        assertGetLastMessageId(consumer, expectedLastMessageID1);
     }
 
     /**
@@ -368,7 +380,7 @@ public class TopicTransactionBufferTest extends 
TransactionTestBase {
         });
     }
 
-    private void assertMessageId(Consumer<?> consumer, MessageIdImpl expected) 
throws Exception {
+    private void assertGetLastMessageId(Consumer<?> consumer, MessageIdImpl 
expected) throws Exception {
         TopicMessageIdImpl actual = (TopicMessageIdImpl) 
consumer.getLastMessageIds().get(0);
         assertEquals(expected.getEntryId(), actual.getEntryId());
         assertEquals(expected.getLedgerId(), actual.getLedgerId());

Reply via email to