This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 78cda11de37601c41e9a67fc1f36b1a21992f1c1
Author: Xiangying Meng <[email protected]>
AuthorDate: Wed Dec 13 10:21:11 2023 +0800

    [fix][txn] Fix getting last message ID when there are ongoing transactions 
(#21466)
    
    (cherry picked from commit 50007c343ad911edf5654786a7e3a1fc10901091)
---
 .../apache/pulsar/broker/service/ServerCnx.java    |  37 +++---
 .../broker/service/persistent/PersistentTopic.java |   4 +-
 .../buffer/impl/InMemTransactionBuffer.java        |   5 +-
 .../buffer/impl/TopicTransactionBuffer.java        |   3 +-
 .../buffer/impl/TransactionBufferDisable.java      |   8 +-
 .../buffer/TopicTransactionBufferTest.java         | 126 +++++++++++++++++++++
 6 files changed, 161 insertions(+), 22 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 224a4489478..d118ff0c31e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1927,23 +1927,28 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             long requestId = getLastMessageId.getRequestId();
 
             Topic topic = consumer.getSubscription().getTopic();
-            Position lastPosition = topic.getLastPosition();
-            int partitionIndex = TopicName.getPartitionIndex(topic.getName());
-
-            Position markDeletePosition = null;
-            if (consumer.getSubscription() instanceof PersistentSubscription) {
-                markDeletePosition = ((PersistentSubscription) 
consumer.getSubscription()).getCursor()
-                        .getMarkDeletedPosition();
-            }
-
-            getLargestBatchIndexWhenPossible(
-                    topic,
-                    (PositionImpl) lastPosition,
-                    (PositionImpl) markDeletePosition,
-                    partitionIndex,
-                    requestId,
-                    consumer.getSubscription().getName());
+            topic.checkIfTransactionBufferRecoverCompletely(true).thenRun(() 
-> {
+                Position lastPosition = ((PersistentTopic) 
topic).getMaxReadPosition();
+                int partitionIndex = 
TopicName.getPartitionIndex(topic.getName());
+
+                Position markDeletePosition = null;
+                if (consumer.getSubscription() instanceof 
PersistentSubscription) {
+                    markDeletePosition = ((PersistentSubscription) 
consumer.getSubscription()).getCursor()
+                            .getMarkDeletedPosition();
+                }
 
+                getLargestBatchIndexWhenPossible(
+                        topic,
+                        (PositionImpl) lastPosition,
+                        (PositionImpl) markDeletePosition,
+                        partitionIndex,
+                        requestId,
+                        consumer.getSubscription().getName());
+            }).exceptionally(e -> {
+                
ctx.writeAndFlush(Commands.newError(getLastMessageId.getRequestId(),
+                        ServerError.UnknownError, "Failed to recover 
Transaction Buffer."));
+                return null;
+            });
         } else {
             
ctx.writeAndFlush(Commands.newError(getLastMessageId.getRequestId(),
                     ServerError.MetadataError, "Consumer not found"));
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 71696007049..61654339203 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -330,7 +330,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             this.transactionBuffer = brokerService.getPulsar()
                     .getTransactionBufferProvider().newTransactionBuffer(this);
         } else {
-            this.transactionBuffer = new TransactionBufferDisable();
+            this.transactionBuffer = new TransactionBufferDisable(this);
         }
         transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) 
ledger.getLastConfirmedEntry());
     }
@@ -406,7 +406,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             this.transactionBuffer = brokerService.getPulsar()
                     .getTransactionBufferProvider().newTransactionBuffer(this);
         } else {
-            this.transactionBuffer = new TransactionBufferDisable();
+            this.transactionBuffer = new TransactionBufferDisable(this);
         }
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
index c4a9fc2b774..7bd9405fd6b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
@@ -211,9 +211,12 @@ class InMemTransactionBuffer implements TransactionBuffer {
 
     final ConcurrentMap<TxnID, TxnBuffer> buffers;
     final Map<Long, Set<TxnID>> txnIndex;
+    private final Topic topic;
+
     public InMemTransactionBuffer(Topic topic) {
         this.buffers = new ConcurrentHashMap<>();
         this.txnIndex = new HashMap<>();
+        this.topic = topic;
     }
 
     @Override
@@ -371,7 +374,7 @@ class InMemTransactionBuffer implements TransactionBuffer {
 
     @Override
     public PositionImpl getMaxReadPosition() {
-        return PositionImpl.LATEST;
+        return (PositionImpl) topic.getLastPosition();
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index ebf61c3f07f..61bd3b29ab6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -515,8 +515,7 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
         ongoingTxns.remove(txnID);
         if (!ongoingTxns.isEmpty()) {
             PositionImpl position = ongoingTxns.get(ongoingTxns.firstKey());
-            //max read position is less than first ongoing transaction message 
position, so entryId -1
-            maxReadPosition = PositionImpl.get(position.getLedgerId(), 
position.getEntryId() - 1);
+            maxReadPosition = ((ManagedLedgerImpl) 
topic.getManagedLedger()).getPreviousPosition(position);
         } else {
             maxReadPosition = (PositionImpl) 
topic.getManagedLedger().getLastConfirmedEntry();
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
index d700195416c..b2a8991189d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
@@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
 import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
@@ -39,6 +40,11 @@ import org.apache.pulsar.common.util.FutureUtil;
 @Slf4j
 public class TransactionBufferDisable implements TransactionBuffer {
 
+    private final Topic topic;
+    public TransactionBufferDisable(Topic topic) {
+        this.topic = topic;
+    }
+
     @Override
     public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) {
         return CompletableFuture.completedFuture(null);
@@ -90,7 +96,7 @@ public class TransactionBufferDisable implements 
TransactionBuffer {
 
     @Override
     public PositionImpl getMaxReadPosition() {
-        return PositionImpl.LATEST;
+        return (PositionImpl) topic.getLastPosition();
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
index 514e135f350..cf530d7ce22 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
@@ -18,9 +18,17 @@
  */
 package org.apache.pulsar.broker.transaction.buffer;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.when;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertTrue;
+import static org.testng.AssertJUnit.fail;
+import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.Topic;
@@ -29,8 +37,11 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
 import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
@@ -179,4 +190,119 @@ public class TopicTransactionBufferTest extends 
TransactionTestBase {
         Assert.assertTrue(f.isCompletedExceptionally());
     }
 
+    /**
+     * This test mainly test the following two point:
+     *      1. `getLastMessageIds` will get max read position.
+     *      Send two message |1:0|1:1|; mock max read position as |1:0|; 
`getLastMessageIds` will get |1:0|.
+     *      2. `getLastMessageIds` will wait Transaction buffer recover 
completely.
+     *      Mock `checkIfTBRecoverCompletely` return an exception, 
`getLastMessageIds` will fail too.
+     *      Mock `checkIfTBRecoverCompletely` return null, `getLastMessageIds` 
will get correct result.
+     */
+    @Test
+    public void testGetMaxPositionAfterTBReady() throws Exception {
+        // 1. Prepare test environment.
+        String topic = "persistent://" + NAMESPACE1 + 
"/testGetMaxReadyPositionAfterTBReady";
+        // 1.1 Mock component.
+        TransactionBuffer transactionBuffer = 
Mockito.spy(TransactionBuffer.class);
+        when(transactionBuffer.checkIfTBRecoverCompletely(anyBoolean()))
+                // Handle producer will check transaction buffer recover 
completely.
+                .thenReturn(CompletableFuture.completedFuture(null))
+                // If the Transaction buffer failed to recover, we can not get 
the correct last max read id.
+                .thenReturn(CompletableFuture.failedFuture(new Throwable("Mock 
fail")))
+                // If the transaction buffer recover successfully, the max 
read position can be acquired successfully.
+                .thenReturn(CompletableFuture.completedFuture(null));
+        TransactionBufferProvider transactionBufferProvider = 
Mockito.spy(TransactionBufferProvider.class);
+        
Mockito.doReturn(transactionBuffer).when(transactionBufferProvider).newTransactionBuffer(any());
+        TransactionBufferProvider originalTBProvider = 
getPulsarServiceList().get(0).getTransactionBufferProvider();
+        
Mockito.doReturn(transactionBufferProvider).when(getPulsarServiceList().get(0)).getTransactionBufferProvider();
+        // 2. Building producer and consumer.
+        admin.topics().createNonPartitionedTopic(topic);
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("sub")
+                .subscribe();
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .create();
+        // 3. Send message and test the exception can be handled as expected.
+        MessageIdImpl messageId = (MessageIdImpl) producer.newMessage().send();
+        producer.newMessage().send();
+        Mockito.doReturn(new PositionImpl(messageId.getLedgerId(), 
messageId.getEntryId()))
+                .when(transactionBuffer).getMaxReadPosition();
+        try {
+            consumer.getLastMessageId();
+            fail();
+        } catch (PulsarClientException exception) {
+            assertTrue(exception.getMessage().contains("Failed to recover 
Transaction Buffer."));
+        }
+        MessageIdImpl msgId = (MessageIdImpl) consumer.getLastMessageId();
+        assertEquals(messageId.getLedgerId(), msgId.getLedgerId());
+        assertEquals(messageId.getEntryId(), msgId.getEntryId());
+        // 4. Clean resource
+        
Mockito.doReturn(originalTBProvider).when(getPulsarServiceList().get(0)).getTransactionBufferProvider();
+    }
+
+    /**
+     * Add a E2E test for the get last message ID. It tests 4 cases.
+     *     <p>
+     *         1. Only normal messages in the topic.
+     *         2. There are ongoing transactions, last message ID will not be 
updated until transaction end.
+     *         3. Aborted transaction will make the last message ID be updated 
as expected.
+     *         4. Committed transaction will make the last message ID be 
updated as expected.
+     *     </p>
+     */
+    @Test
+    public void testGetLastMessageIdsWithOngoingTransactions() throws 
Exception {
+        // 1. Prepare environment
+        String topic = "persistent://" + NAMESPACE1 + 
"/testGetLastMessageIdsWithOngoingTransactions";
+        String subName = "my-subscription";
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .create();
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName(subName)
+                .subscribe();
+
+        // 2. Test last max read position can be required correctly.
+        // 2.1 Case1: send 3 original messages. |1:0|1:1|1:2|
+        MessageIdImpl expectedLastMessageID = null;
+        for (int i = 0; i < 3; i++) {
+            expectedLastMessageID = (MessageIdImpl) 
producer.newMessage().send();
+        }
+        assertMessageId(consumer, expectedLastMessageID, 0);
+        // 2.2 Case2: send 2 ongoing transactional messages and 2 original 
messages.
+        // |1:0|1:1|1:2|txn1->1:3|1:4|txn2->1:5|1:6|.
+        Transaction txn1 = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.HOURS)
+                .build()
+                .get();
+        Transaction txn2 = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.HOURS)
+                .build()
+                .get();
+        producer.newMessage(txn1).send();
+        MessageIdImpl expectedLastMessageID1 = (MessageIdImpl) 
producer.newMessage().send();
+        producer.newMessage(txn2).send();
+        MessageIdImpl expectedLastMessageID2 = (MessageIdImpl) 
producer.newMessage().send();
+        // 2.2.1 Last message ID will not change when txn1 and txn2 do not end.
+        assertMessageId(consumer, expectedLastMessageID, 0);
+        // 2.2.2 Last message ID will update to 1:4 when txn1 committed.
+        txn1.commit().get(5, TimeUnit.SECONDS);
+        assertMessageId(consumer, expectedLastMessageID1, 0);
+        // 2.2.3 Last message ID will update to 1:6 when txn2 aborted.
+        txn2.abort().get(5, TimeUnit.SECONDS);
+        // Todo: We can not ignore the marker's position in this fix.
+        assertMessageId(consumer, expectedLastMessageID2, 2);
+    }
+
+    private void assertMessageId(Consumer<?> consumer, MessageIdImpl expected, 
int entryOffset) throws Exception {
+        MessageIdImpl msgId = (MessageIdImpl) consumer.getLastMessageId();
+        assertEquals(expected.getEntryId(), msgId.getEntryId() - entryOffset);
+        assertEquals(expected.getLedgerId(), msgId.getLedgerId());
+    }
+
 }

Reply via email to