This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new b1f1580d5ac [fix][broker] Reader stuck after call hasMessageAvailable 
when enable replicateSubscriptionState (#22572)
b1f1580d5ac is described below

commit b1f1580d5ac08699c2b80fa73937d92dc0809c99
Author: Baodi Shi <[email protected]>
AuthorDate: Sun Apr 28 20:22:09 2024 +0800

    [fix][broker] Reader stuck after call hasMessageAvailable when enable 
replicateSubscriptionState (#22572)
    
    (cherry picked from commit a761b97b733142b1ade525e1d1c06785e98face1)
---
 .../mledger/util/ManagedLedgerImplUtils.java       | 82 ++++++++++++++++++++++
 .../mledger/util/ManagedLedgerImplUtilsTest.java   | 74 +++++++++++++++++++
 .../apache/pulsar/broker/service/ServerCnx.java    | 48 +++++++------
 .../org/apache/pulsar/broker/service/Topic.java    |  7 ++
 .../broker/service/persistent/PersistentTopic.java | 18 +++++
 .../broker/service/ReplicatorSubscriptionTest.java | 77 ++++++++++++++++++++
 .../ReplicatorSubscriptionWithTransactionTest.java | 53 ++++++++++++++
 .../buffer/TopicTransactionBufferTest.java         | 22 +++---
 8 files changed, 350 insertions(+), 31 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java
new file mode 100644
index 00000000000..cd8671b0e62
--- /dev/null
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.util;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.common.classification.InterfaceStability;
+
[email protected]
+public class ManagedLedgerImplUtils {
+
+    /**
+     * Reverse find last valid position one-entry by one-entry.
+     */
+    public static CompletableFuture<Position> asyncGetLastValidPosition(final 
ManagedLedgerImpl ledger,
+                                                                        final 
Predicate<Entry> predicate,
+                                                                        final 
PositionImpl startPosition) {
+        CompletableFuture<Position> future = new CompletableFuture<>();
+        if (!ledger.isValidPosition(startPosition)) {
+            future.complete(startPosition);
+        } else {
+            internalAsyncReverseFindPositionOneByOne(ledger, predicate, 
startPosition, future);
+        }
+        return future;
+    }
+
+    private static void internalAsyncReverseFindPositionOneByOne(final 
ManagedLedgerImpl ledger,
+                                                                 final 
Predicate<Entry> predicate,
+                                                                 final 
PositionImpl position,
+                                                                 final 
CompletableFuture<Position> future) {
+        ledger.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() 
{
+            @Override
+            public void readEntryComplete(Entry entry, Object ctx) {
+                final Position position = entry.getPosition();
+                try {
+                    if (predicate.test(entry)) {
+                        future.complete(position);
+                        return;
+                    }
+                    PositionImpl previousPosition = 
ledger.getPreviousPosition((PositionImpl) position);
+                    if (!ledger.isValidPosition(previousPosition)) {
+                        future.complete(previousPosition);
+                    } else {
+                        internalAsyncReverseFindPositionOneByOne(ledger, 
predicate,
+                                ledger.getPreviousPosition((PositionImpl) 
position), future);
+                    }
+                } catch (Exception e) {
+                    future.completeExceptionally(e);
+                } finally {
+                    entry.release();
+                }
+            }
+
+            @Override
+            public void readEntryFailed(ManagedLedgerException exception, 
Object ctx) {
+                future.completeExceptionally(exception);
+            }
+        }, null);
+    }
+}
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtilsTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtilsTest.java
new file mode 100644
index 00000000000..f13d23c0529
--- /dev/null
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtilsTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.util;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.testng.Assert.assertEquals;
+import java.nio.charset.StandardCharsets;
+import java.util.function.Predicate;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class ManagedLedgerImplUtilsTest extends MockedBookKeeperTestCase {
+
+    @Test
+    public void testGetLastValidPosition() throws Exception {
+        final int maxEntriesPerLedger = 5;
+
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        managedLedgerConfig.setMaxEntriesPerLedger(maxEntriesPerLedger);
+        ManagedLedger ledger = factory.open("testReverseFindPositionOneByOne", 
managedLedgerConfig);
+
+        String matchEntry = "match-entry";
+        String noMatchEntry = "nomatch-entry";
+        Predicate<Entry> predicate = entry -> {
+            String entryValue = entry.getDataBuffer().toString(UTF_8);
+            return matchEntry.equals(entryValue);
+        };
+
+        // New ledger will return the last position, regardless of whether the 
conditions are met or not.
+        Position position = 
ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, 
+                predicate, (PositionImpl) 
ledger.getLastConfirmedEntry()).get();
+        assertEquals(ledger.getLastConfirmedEntry(), position);
+
+        for (int i = 0; i < maxEntriesPerLedger - 1; i++) {
+            ledger.addEntry(matchEntry.getBytes(StandardCharsets.UTF_8));
+        }
+        Position lastMatchPosition = 
ledger.addEntry(matchEntry.getBytes(StandardCharsets.UTF_8));
+        for (int i = 0; i < maxEntriesPerLedger; i++) {
+            ledger.addEntry(noMatchEntry.getBytes(StandardCharsets.UTF_8));
+        }
+
+        // Returns last position of entry is "match-entry"
+        position = 
ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger,
+                predicate, (PositionImpl) 
ledger.getLastConfirmedEntry()).get();
+        assertEquals(position, lastMatchPosition);
+
+        ledger.close();
+    }
+
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 40a0ee4a73c..c7d68a30e72 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2164,29 +2164,31 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             long requestId = getLastMessageId.getRequestId();
 
             Topic topic = consumer.getSubscription().getTopic();
-            topic.checkIfTransactionBufferRecoverCompletely(true).thenRun(() 
-> {
-                Position lastPosition = ((PersistentTopic) 
topic).getMaxReadPosition();
-                int partitionIndex = 
TopicName.getPartitionIndex(topic.getName());
-
-                Position markDeletePosition = null;
-                if (consumer.getSubscription() instanceof 
PersistentSubscription) {
-                    markDeletePosition = ((PersistentSubscription) 
consumer.getSubscription()).getCursor()
-                            .getMarkDeletedPosition();
-                }
-
-                getLargestBatchIndexWhenPossible(
-                        topic,
-                        (PositionImpl) lastPosition,
-                        (PositionImpl) markDeletePosition,
-                        partitionIndex,
-                        requestId,
-                        consumer.getSubscription().getName(),
-                        consumer.readCompacted());
-            }).exceptionally(e -> {
-                
writeAndFlush(Commands.newError(getLastMessageId.getRequestId(),
-                        ServerError.UnknownError, "Failed to recover 
Transaction Buffer."));
-                return null;
-            });
+            topic.checkIfTransactionBufferRecoverCompletely(true)
+                 .thenCompose(__ -> topic.getLastDispatchablePosition())
+                 .thenApply(lastPosition -> {
+                     int partitionIndex = 
TopicName.getPartitionIndex(topic.getName());
+
+                     Position markDeletePosition = null;
+                     if (consumer.getSubscription() instanceof 
PersistentSubscription) {
+                         markDeletePosition = ((PersistentSubscription) 
consumer.getSubscription()).getCursor()
+                                 .getMarkDeletedPosition();
+                     }
+
+                     getLargestBatchIndexWhenPossible(
+                             topic,
+                             (PositionImpl) lastPosition,
+                             (PositionImpl) markDeletePosition,
+                             partitionIndex,
+                             requestId,
+                             consumer.getSubscription().getName(),
+                             consumer.readCompacted());
+                    return null;
+                 }).exceptionally(e -> {
+                     
writeAndFlush(Commands.newError(getLastMessageId.getRequestId(),
+                             ServerError.UnknownError, "Failed to recover 
Transaction Buffer."));
+                     return null;
+                 });
         } else {
             writeAndFlush(Commands.newError(getLastMessageId.getRequestId(),
                     ServerError.MetadataError, "Consumer not found"));
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 343aef09c1c..1da8cfce4ee 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -268,6 +268,13 @@ public interface Topic {
 
     Position getLastPosition();
 
+    /**
+     * Get the last message position that can be dispatch.
+     */
+    default CompletableFuture<Position> getLastDispatchablePosition() {
+        throw new UnsupportedOperationException("getLastDispatchablePosition 
is not supported by default");
+    }
+
     CompletableFuture<MessageId> getLastMessageId();
 
     /**
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 3a1da8ccee7..cce77013030 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -78,6 +78,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.impl.ShadowManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.util.Futures;
+import org.apache.bookkeeper.mledger.util.ManagedLedgerImplUtils;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -169,6 +170,7 @@ import 
org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
 import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
 import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.Markers;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -3412,6 +3414,22 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         return ledger.getLastConfirmedEntry();
     }
 
+    @Override
+    public CompletableFuture<Position> getLastDispatchablePosition() {
+        PositionImpl maxReadPosition = getMaxReadPosition();
+        // If `maxReadPosition` is not equal to `LastPosition`. It means that 
there are uncommitted transactions.
+        // so return `maxRedPosition` directly.
+        if (maxReadPosition.compareTo((PositionImpl) getLastPosition()) != 0) {
+            return CompletableFuture.completedFuture(maxReadPosition);
+        } else {
+            return 
ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, 
entry -> {
+                MessageMetadata md = 
Commands.parseMessageMetadata(entry.getDataBuffer());
+                // If a messages has marker will filter by 
AbstractBaseDispatcher.filterEntriesForConsumer
+                return !Markers.isServerOnlyMarker(md);
+            }, maxReadPosition);
+        }
+    }
+
     @Override
     public CompletableFuture<MessageId> getLastMessageId() {
         CompletableFuture<MessageId> completableFuture = new 
CompletableFuture<>();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
index 4cc3a9ada7d..bfafdf89ed6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
@@ -51,6 +51,7 @@ import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
@@ -167,6 +168,82 @@ public class ReplicatorSubscriptionTest extends 
ReplicatorTestBase {
                 "messages don't match.");
     }
 
+    /**
+     * Tests replicated subscriptions across two regions and can read 
successful.
+     */
+    @Test
+    public void testReplicatedSubscriptionAcrossTwoRegionsGetLastMessage() 
throws Exception {
+        String namespace = 
BrokerTestUtil.newUniqueName("pulsar/replicatedsubscriptionlastmessage");
+        String topicName = "persistent://" + namespace + "/mytopic";
+        String subscriptionName = "cluster-subscription";
+        // this setting can be used to manually run the test with subscription 
replication disabled
+        // it shows that subscription replication has no impact in behavior 
for this test case
+        boolean replicateSubscriptionState = true;
+
+        admin1.namespaces().createNamespace(namespace);
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, 
Sets.newHashSet("r1", "r2"));
+
+        @Cleanup
+        PulsarClient client1 = 
PulsarClient.builder().serviceUrl(url1.toString())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
+
+        // create subscription in r1
+        createReplicatedSubscription(client1, topicName, subscriptionName, 
replicateSubscriptionState);
+
+        @Cleanup
+        PulsarClient client2 = 
PulsarClient.builder().serviceUrl(url2.toString())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
+
+        // create subscription in r2
+        createReplicatedSubscription(client2, topicName, subscriptionName, 
replicateSubscriptionState);
+
+        Set<String> sentMessages = new LinkedHashSet<>();
+
+        // send messages in r1
+        @Cleanup
+        Producer<byte[]> producer = client1.newProducer().topic(topicName)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+        int numMessages = 6;
+        for (int i = 0; i < numMessages; i++) {
+            String body = "message" + i;
+            producer.send(body.getBytes(StandardCharsets.UTF_8));
+            sentMessages.add(body);
+        }
+        producer.close();
+
+
+        // consume 3 messages in r1
+        Set<String> receivedMessages = new LinkedHashSet<>();
+        try (Consumer<byte[]> consumer1 = client1.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subscriptionName)
+                .replicateSubscriptionState(replicateSubscriptionState)
+                .subscribe()) {
+            readMessages(consumer1, receivedMessages, 3, false);
+        }
+
+        // wait for subscription to be replicated
+        Thread.sleep(2 * 
config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
+
+        // create a reader in r2
+        Reader<byte[]> reader = client2.newReader().topic(topicName)
+                .subscriptionName("new-sub")
+                .startMessageId(MessageId.earliest)
+                .create();
+        int readNum = 0;
+        while (reader.hasMessageAvailable()) {
+            Message<byte[]> message = reader.readNext(10, TimeUnit.SECONDS);
+            assertNotNull(message);
+            log.info("Receive message: " + new String(message.getValue()) + " 
msgId: " + message.getMessageId());
+            readNum++;
+        }
+        assertEquals(readNum, numMessages);
+    }
+
     @Test
     public void testReplicatedSubscribeAndSwitchToStandbyCluster() throws 
Exception {
         final String namespace = BrokerTestUtil.newUniqueName("pulsar/ns_");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionWithTransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionWithTransactionTest.java
new file mode 100644
index 00000000000..93a22a851f1
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionWithTransactionTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Tests replicated subscriptions with transaction (PIP-33)
+ */
+@Test(groups = "broker")
+public class ReplicatorSubscriptionWithTransactionTest extends 
ReplicatorSubscriptionTest {
+
+    @Override
+    @BeforeClass(timeOut = 300000)
+    public void setup() throws Exception {
+        config1.setTransactionCoordinatorEnabled(true);
+        config2.setTransactionCoordinatorEnabled(true);
+        config3.setTransactionCoordinatorEnabled(true);
+        config4.setTransactionCoordinatorEnabled(true);
+        super.setup();
+    }
+
+    @Override
+    @AfterClass(alwaysRun = true, timeOut = 300000)
+    public void cleanup() throws Exception {
+        super.cleanup();
+    }
+
+    @DataProvider(name = "isTopicPolicyEnabled")
+    private Object[][] isTopicPolicyEnabled() {
+        // Todo: fix replication can not be enabled at topic level.
+        return new Object[][] { { Boolean.FALSE } };
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
index fad785cc882..b0903b00be3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
@@ -280,9 +280,9 @@ public class TopicTransactionBufferTest extends 
TransactionTestBase {
         for (int i = 0; i < 3; i++) {
             expectedLastMessageID = (MessageIdImpl) 
producer.newMessage().send();
         }
-        assertMessageId(consumer, expectedLastMessageID, 0);
+        assertMessageId(consumer, expectedLastMessageID);
         // 2.2 Case2: send 2 ongoing transactional messages and 2 original 
messages.
-        // |1:0|1:1|1:2|txn1->1:3|1:4|txn2->1:5|1:6|.
+        // |1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5|1:6|.
         Transaction txn1 = pulsarClient.newTransaction()
                 .withTransactionTimeout(5, TimeUnit.HOURS)
                 .build()
@@ -292,18 +292,24 @@ public class TopicTransactionBufferTest extends 
TransactionTestBase {
                 .build()
                 .get();
         producer.newMessage(txn1).send();
+        // expectedLastMessageID1 == 1:4
         MessageIdImpl expectedLastMessageID1 = (MessageIdImpl) 
producer.newMessage().send();
         producer.newMessage(txn2).send();
+        // expectedLastMessageID2 == 1:6
         MessageIdImpl expectedLastMessageID2 = (MessageIdImpl) 
producer.newMessage().send();
+
         // 2.2.1 Last message ID will not change when txn1 and txn2 do not end.
-        assertMessageId(consumer, expectedLastMessageID, 0);
+        assertMessageId(consumer, expectedLastMessageID);
+
         // 2.2.2 Last message ID will update to 1:4 when txn1 committed.
+        // 
|1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5|1:6|tx1:commit->1:7|
         txn1.commit().get(5, TimeUnit.SECONDS);
-        assertMessageId(consumer, expectedLastMessageID1, 0);
+        assertMessageId(consumer, expectedLastMessageID1);
+
         // 2.2.3 Last message ID will update to 1:6 when txn2 aborted.
+        // 
|1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5|1:6|tx1:commit->1:7|tx2:abort->1:8|
         txn2.abort().get(5, TimeUnit.SECONDS);
-        // Todo: We can not ignore the marker's position in this fix.
-        assertMessageId(consumer, expectedLastMessageID2, 2);
+        assertMessageId(consumer, expectedLastMessageID2);
     }
 
     /**
@@ -362,9 +368,9 @@ public class TopicTransactionBufferTest extends 
TransactionTestBase {
         });
     }
 
-    private void assertMessageId(Consumer<?> consumer, MessageIdImpl expected, 
int entryOffset) throws Exception {
+    private void assertMessageId(Consumer<?> consumer, MessageIdImpl expected) 
throws Exception {
         TopicMessageIdImpl actual = (TopicMessageIdImpl) 
consumer.getLastMessageIds().get(0);
-        assertEquals(expected.getEntryId(), actual.getEntryId() - entryOffset);
+        assertEquals(expected.getEntryId(), actual.getEntryId());
         assertEquals(expected.getLedgerId(), actual.getLedgerId());
     }
 

Reply via email to