This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 99cab24b363 [fix][txn] Fix getting last message ID when there are 
ongoing transactions (#21466)
99cab24b363 is described below

commit 99cab24b3631437a7d143b3624959505f1e19929
Author: Xiangying Meng <[email protected]>
AuthorDate: Wed Dec 13 10:21:11 2023 +0800

    [fix][txn] Fix getting last message ID when there are ongoing transactions 
(#21466)
---
 .../apache/pulsar/broker/service/ServerCnx.java    |  37 +++---
 .../broker/service/persistent/PersistentTopic.java |   4 +-
 .../buffer/impl/InMemTransactionBuffer.java        |   5 +-
 .../buffer/impl/TopicTransactionBuffer.java        |   4 +-
 .../buffer/impl/TransactionBufferDisable.java      |   8 +-
 .../buffer/TopicTransactionBufferTest.java         | 131 +++++++++++++++++++++
 6 files changed, 167 insertions(+), 22 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 4e61a3228fe..cca53bf9d6a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2063,23 +2063,28 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             long requestId = getLastMessageId.getRequestId();
 
             Topic topic = consumer.getSubscription().getTopic();
-            Position lastPosition = topic.getLastPosition();
-            int partitionIndex = TopicName.getPartitionIndex(topic.getName());
-
-            Position markDeletePosition = null;
-            if (consumer.getSubscription() instanceof PersistentSubscription) {
-                markDeletePosition = ((PersistentSubscription) 
consumer.getSubscription()).getCursor()
-                        .getMarkDeletedPosition();
-            }
-
-            getLargestBatchIndexWhenPossible(
-                    topic,
-                    (PositionImpl) lastPosition,
-                    (PositionImpl) markDeletePosition,
-                    partitionIndex,
-                    requestId,
-                    consumer.getSubscription().getName());
+            topic.checkIfTransactionBufferRecoverCompletely(true).thenRun(() 
-> {
+                Position lastPosition = ((PersistentTopic) 
topic).getMaxReadPosition();
+                int partitionIndex = 
TopicName.getPartitionIndex(topic.getName());
+
+                Position markDeletePosition = null;
+                if (consumer.getSubscription() instanceof 
PersistentSubscription) {
+                    markDeletePosition = ((PersistentSubscription) 
consumer.getSubscription()).getCursor()
+                            .getMarkDeletedPosition();
+                }
 
+                getLargestBatchIndexWhenPossible(
+                        topic,
+                        (PositionImpl) lastPosition,
+                        (PositionImpl) markDeletePosition,
+                        partitionIndex,
+                        requestId,
+                        consumer.getSubscription().getName());
+            }).exceptionally(e -> {
+                
writeAndFlush(Commands.newError(getLastMessageId.getRequestId(),
+                        ServerError.UnknownError, "Failed to recover 
Transaction Buffer."));
+                return null;
+            });
         } else {
             writeAndFlush(Commands.newError(getLastMessageId.getRequestId(),
                     ServerError.MetadataError, "Consumer not found"));
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 1619c977d1f..bbd4c31c258 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -309,7 +309,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             this.transactionBuffer = brokerService.getPulsar()
                     .getTransactionBufferProvider().newTransactionBuffer(this);
         } else {
-            this.transactionBuffer = new TransactionBufferDisable();
+            this.transactionBuffer = new TransactionBufferDisable(this);
         }
         transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) 
ledger.getLastConfirmedEntry());
         if (ledger instanceof ShadowManagedLedgerImpl) {
@@ -402,7 +402,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             this.transactionBuffer = brokerService.getPulsar()
                     .getTransactionBufferProvider().newTransactionBuffer(this);
         } else {
-            this.transactionBuffer = new TransactionBufferDisable();
+            this.transactionBuffer = new TransactionBufferDisable(this);
         }
         shadowSourceTopic = null;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
index bc2dd58a581..978536c5f4e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
@@ -212,9 +212,12 @@ class InMemTransactionBuffer implements TransactionBuffer {
 
     final ConcurrentMap<TxnID, TxnBuffer> buffers;
     final Map<Long, Set<TxnID>> txnIndex;
+    private final Topic topic;
+
     public InMemTransactionBuffer(Topic topic) {
         this.buffers = new ConcurrentHashMap<>();
         this.txnIndex = new HashMap<>();
+        this.topic = topic;
     }
 
     @Override
@@ -372,7 +375,7 @@ class InMemTransactionBuffer implements TransactionBuffer {
 
     @Override
     public PositionImpl getMaxReadPosition() {
-        return PositionImpl.LATEST;
+        return (PositionImpl) topic.getLastPosition();
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 3c13be22086..f356921d698 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -37,6 +37,7 @@ import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.collections4.map.LinkedMap;
 import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -446,8 +447,7 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
         ongoingTxns.remove(txnID);
         if (!ongoingTxns.isEmpty()) {
             PositionImpl position = ongoingTxns.get(ongoingTxns.firstKey());
-            //max read position is less than first ongoing transaction message 
position, so entryId -1
-            maxReadPosition = PositionImpl.get(position.getLedgerId(), 
position.getEntryId() - 1);
+            maxReadPosition = ((ManagedLedgerImpl) 
topic.getManagedLedger()).getPreviousPosition(position);
         } else {
             maxReadPosition = (PositionImpl) 
topic.getManagedLedger().getLastConfirmedEntry();
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
index 7c74b52951e..9de0888ae5b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
@@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
@@ -40,6 +41,11 @@ import org.apache.pulsar.common.util.FutureUtil;
 @Slf4j
 public class TransactionBufferDisable implements TransactionBuffer {
 
+    private final Topic topic;
+    public TransactionBufferDisable(Topic topic) {
+        this.topic = topic;
+    }
+
     @Override
     public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) {
         return CompletableFuture.completedFuture(null);
@@ -91,7 +97,7 @@ public class TransactionBufferDisable implements 
TransactionBuffer {
 
     @Override
     public PositionImpl getMaxReadPosition() {
-        return PositionImpl.LATEST;
+        return (PositionImpl) topic.getLastPosition();
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
index aa98fc7d701..6ab56a613c5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
@@ -18,9 +18,18 @@
  */
 package org.apache.pulsar.broker.transaction.buffer;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.when;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertTrue;
+import static org.testng.AssertJUnit.fail;
+import java.util.List;
+import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.BrokerService;
@@ -30,8 +39,13 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
 import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TopicMessageId;
 import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.TopicMessageIdImpl;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
@@ -179,4 +193,121 @@ public class TopicTransactionBufferTest extends 
TransactionTestBase {
         Assert.assertTrue(f.isCompletedExceptionally());
     }
 
+    /**
+     * This test mainly test the following two point:
+     *      1. `getLastMessageIds` will get max read position.
+     *      Send two message |1:0|1:1|; mock max read position as |1:0|; 
`getLastMessageIds` will get |1:0|.
+     *      2. `getLastMessageIds` will wait Transaction buffer recover 
completely.
+     *      Mock `checkIfTBRecoverCompletely` return an exception, 
`getLastMessageIds` will fail too.
+     *      Mock `checkIfTBRecoverCompletely` return null, `getLastMessageIds` 
will get correct result.
+     */
+    @Test
+    public void testGetMaxPositionAfterTBReady() throws Exception {
+        // 1. Prepare test environment.
+        String topic = "persistent://" + NAMESPACE1 + 
"/testGetMaxReadyPositionAfterTBReady";
+        // 1.1 Mock component.
+        TransactionBuffer transactionBuffer = 
Mockito.spy(TransactionBuffer.class);
+        when(transactionBuffer.checkIfTBRecoverCompletely(anyBoolean()))
+                // Handle producer will check transaction buffer recover 
completely.
+                .thenReturn(CompletableFuture.completedFuture(null))
+                // If the Transaction buffer failed to recover, we can not get 
the correct last max read id.
+                .thenReturn(CompletableFuture.failedFuture(new Throwable("Mock 
fail")))
+                // If the transaction buffer recover successfully, the max 
read position can be acquired successfully.
+                .thenReturn(CompletableFuture.completedFuture(null));
+        TransactionBufferProvider transactionBufferProvider = 
Mockito.spy(TransactionBufferProvider.class);
+        
Mockito.doReturn(transactionBuffer).when(transactionBufferProvider).newTransactionBuffer(any());
+        TransactionBufferProvider originalTBProvider = 
getPulsarServiceList().get(0).getTransactionBufferProvider();
+        
Mockito.doReturn(transactionBufferProvider).when(getPulsarServiceList().get(0)).getTransactionBufferProvider();
+        // 2. Building producer and consumer.
+        admin.topics().createNonPartitionedTopic(topic);
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("sub")
+                .subscribe();
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .create();
+        // 3. Send message and test the exception can be handled as expected.
+        MessageIdImpl messageId = (MessageIdImpl) producer.newMessage().send();
+        producer.newMessage().send();
+        Mockito.doReturn(new PositionImpl(messageId.getLedgerId(), 
messageId.getEntryId()))
+                .when(transactionBuffer).getMaxReadPosition();
+        try {
+            consumer.getLastMessageIds();
+            fail();
+        } catch (PulsarClientException exception) {
+            assertTrue(exception.getMessage().contains("Failed to recover 
Transaction Buffer."));
+        }
+        List<TopicMessageId> messageIdList = consumer.getLastMessageIds();
+        assertEquals(messageIdList.size(), 1);
+        TopicMessageIdImpl actualMessageID = (TopicMessageIdImpl) 
messageIdList.get(0);
+        assertEquals(messageId.getLedgerId(), actualMessageID.getLedgerId());
+        assertEquals(messageId.getEntryId(), actualMessageID.getEntryId());
+        // 4. Clean resource
+        
Mockito.doReturn(originalTBProvider).when(getPulsarServiceList().get(0)).getTransactionBufferProvider();
+    }
+
+    /**
+     * Add a E2E test for the get last message ID. It tests 4 cases.
+     *     <p>
+     *         1. Only normal messages in the topic.
+     *         2. There are ongoing transactions, last message ID will not be 
updated until transaction end.
+     *         3. Aborted transaction will make the last message ID be updated 
as expected.
+     *         4. Committed transaction will make the last message ID be 
updated as expected.
+     *     </p>
+     */
+    @Test
+    public void testGetLastMessageIdsWithOngoingTransactions() throws 
Exception {
+        // 1. Prepare environment
+        String topic = "persistent://" + NAMESPACE1 + 
"/testGetLastMessageIdsWithOngoingTransactions";
+        String subName = "my-subscription";
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .create();
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName(subName)
+                .subscribe();
+
+        // 2. Test last max read position can be required correctly.
+        // 2.1 Case1: send 3 original messages. |1:0|1:1|1:2|
+        MessageIdImpl expectedLastMessageID = null;
+        for (int i = 0; i < 3; i++) {
+            expectedLastMessageID = (MessageIdImpl) 
producer.newMessage().send();
+        }
+        assertMessageId(consumer, expectedLastMessageID, 0);
+        // 2.2 Case2: send 2 ongoing transactional messages and 2 original 
messages.
+        // |1:0|1:1|1:2|txn1->1:3|1:4|txn2->1:5|1:6|.
+        Transaction txn1 = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.HOURS)
+                .build()
+                .get();
+        Transaction txn2 = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.HOURS)
+                .build()
+                .get();
+        producer.newMessage(txn1).send();
+        MessageIdImpl expectedLastMessageID1 = (MessageIdImpl) 
producer.newMessage().send();
+        producer.newMessage(txn2).send();
+        MessageIdImpl expectedLastMessageID2 = (MessageIdImpl) 
producer.newMessage().send();
+        // 2.2.1 Last message ID will not change when txn1 and txn2 do not end.
+        assertMessageId(consumer, expectedLastMessageID, 0);
+        // 2.2.2 Last message ID will update to 1:4 when txn1 committed.
+        txn1.commit().get(5, TimeUnit.SECONDS);
+        assertMessageId(consumer, expectedLastMessageID1, 0);
+        // 2.2.3 Last message ID will update to 1:6 when txn2 aborted.
+        txn2.abort().get(5, TimeUnit.SECONDS);
+        // Todo: We can not ignore the marker's position in this fix.
+        assertMessageId(consumer, expectedLastMessageID2, 2);
+    }
+
+    private void assertMessageId(Consumer<?> consumer, MessageIdImpl expected, 
int entryOffset) throws Exception {
+        TopicMessageIdImpl actual = (TopicMessageIdImpl) 
consumer.getLastMessageIds().get(0);
+        assertEquals(expected.getEntryId(), actual.getEntryId() - entryOffset);
+        assertEquals(expected.getLedgerId(), actual.getLedgerId());
+    }
+
 }

Reply via email to