This is an automated email from the ASF dual-hosted git repository.

daojun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f72817aefe [fix] [txn] Get previous position by managed ledger. 
(#22024)
1f72817aefe is described below

commit 1f72817aefe9ce9972a0108450711d7074c9dc7f
Author: thetumbled <[email protected]>
AuthorDate: Thu Feb 22 12:07:43 2024 +0800

    [fix] [txn] Get previous position by managed ledger. (#22024)
---
 .../buffer/impl/TopicTransactionBuffer.java        |  4 +-
 .../buffer/TopicTransactionBufferTest.java         | 58 ++++++++++++++++++++++
 2 files changed, 60 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index f356921d698..5392e473947 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -287,8 +287,8 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
                 .checkAbortedTransaction(txnId)) {
             ongoingTxns.put(txnId, (PositionImpl) position);
             PositionImpl firstPosition = 
ongoingTxns.get(ongoingTxns.firstKey());
-            //max read position is less than first ongoing transaction message 
position, so entryId -1
-            maxReadPosition = PositionImpl.get(firstPosition.getLedgerId(), 
firstPosition.getEntryId() - 1);
+            // max read position is less than first ongoing transaction 
message position
+            maxReadPosition = ((ManagedLedgerImpl) 
topic.getManagedLedger()).getPreviousPosition(firstPosition);
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
index e5ad910cb1f..fad785cc882 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
@@ -40,8 +40,10 @@ import 
org.apache.pulsar.broker.transaction.TransactionTestBase;
 import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.TopicMessageId;
 import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -304,6 +306,62 @@ public class TopicTransactionBufferTest extends 
TransactionTestBase {
         assertMessageId(consumer, expectedLastMessageID2, 2);
     }
 
+    /**
+     * produce 3 messages and then trigger a ledger switch,
+     * then create a transaction and send a transactional message.
+     * As there are messages in the new ledger, the reader should be able to 
read the messages.
+     * But reader.hasMessageAvailable() returns false if the entry id of  max 
read position is -1.
+     * @throws Exception
+     */
+    @Test
+    public void testGetLastMessageIdsWithOpenTransactionAtLedgerHead() throws 
Exception {
+        String topic = "persistent://" + NAMESPACE1 + 
"/testGetLastMessageIdsWithOpenTransactionAtLedgerHead";
+        String subName = "my-subscription";
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .create();
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName(subName)
+                .subscribe();
+        MessageId expectedLastMessageID = null;
+        for (int i = 0; i < 3; i++) {
+            expectedLastMessageID = 
producer.newMessage().value(String.valueOf(i).getBytes()).send();
+            System.out.println("expectedLastMessageID: " + 
expectedLastMessageID);
+        }
+        triggerLedgerSwitch(topic);
+        Transaction txn = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.HOURS)
+                .build()
+                .get();
+        producer.newMessage(txn).send();
+
+        Reader<byte[]> reader = pulsarClient.newReader()
+                .topic(topic)
+                .startMessageId(MessageId.earliest)
+                .create();
+        assertTrue(reader.hasMessageAvailable());
+    }
+
+    private void triggerLedgerSwitch(String topicName) throws Exception{
+        admin.topics().unload(topicName);
+        Awaitility.await().until(() -> {
+            CompletableFuture<Optional<Topic>> topicFuture =
+                    
getPulsarServiceList().get(0).getBrokerService().getTopic(topicName, false);
+            if (!topicFuture.isDone() || 
topicFuture.isCompletedExceptionally()){
+                return false;
+            }
+            Optional<Topic> topicOptional = topicFuture.join();
+            if (!topicOptional.isPresent()){
+                return false;
+            }
+            PersistentTopic persistentTopic = (PersistentTopic) 
topicOptional.get();
+            ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
+            return managedLedger.getState() == 
ManagedLedgerImpl.State.LedgerOpened;
+        });
+    }
+
     private void assertMessageId(Consumer<?> consumer, MessageIdImpl expected, 
int entryOffset) throws Exception {
         TopicMessageIdImpl actual = (TopicMessageIdImpl) 
consumer.getLastMessageIds().get(0);
         assertEquals(expected.getEntryId(), actual.getEntryId() - entryOffset);

Reply via email to