This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new b1f1580d5ac [fix][broker] Reader stuck after call hasMessageAvailable
when enable replicateSubscriptionState (#22572)
b1f1580d5ac is described below
commit b1f1580d5ac08699c2b80fa73937d92dc0809c99
Author: Baodi Shi <[email protected]>
AuthorDate: Sun Apr 28 20:22:09 2024 +0800
[fix][broker] Reader stuck after call hasMessageAvailable when enable
replicateSubscriptionState (#22572)
(cherry picked from commit a761b97b733142b1ade525e1d1c06785e98face1)
---
.../mledger/util/ManagedLedgerImplUtils.java | 82 ++++++++++++++++++++++
.../mledger/util/ManagedLedgerImplUtilsTest.java | 74 +++++++++++++++++++
.../apache/pulsar/broker/service/ServerCnx.java | 48 +++++++------
.../org/apache/pulsar/broker/service/Topic.java | 7 ++
.../broker/service/persistent/PersistentTopic.java | 18 +++++
.../broker/service/ReplicatorSubscriptionTest.java | 77 ++++++++++++++++++++
.../ReplicatorSubscriptionWithTransactionTest.java | 53 ++++++++++++++
.../buffer/TopicTransactionBufferTest.java | 22 +++---
8 files changed, 350 insertions(+), 31 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java
new file mode 100644
index 00000000000..cd8671b0e62
--- /dev/null
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.util;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.common.classification.InterfaceStability;
+
[email protected]
+public class ManagedLedgerImplUtils {
+
+ /**
+ * Reverse find last valid position one-entry by one-entry.
+ */
+ public static CompletableFuture<Position> asyncGetLastValidPosition(final
ManagedLedgerImpl ledger,
+ final
Predicate<Entry> predicate,
+ final
PositionImpl startPosition) {
+ CompletableFuture<Position> future = new CompletableFuture<>();
+ if (!ledger.isValidPosition(startPosition)) {
+ future.complete(startPosition);
+ } else {
+ internalAsyncReverseFindPositionOneByOne(ledger, predicate,
startPosition, future);
+ }
+ return future;
+ }
+
+ private static void internalAsyncReverseFindPositionOneByOne(final
ManagedLedgerImpl ledger,
+ final
Predicate<Entry> predicate,
+ final
PositionImpl position,
+ final
CompletableFuture<Position> future) {
+ ledger.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback()
{
+ @Override
+ public void readEntryComplete(Entry entry, Object ctx) {
+ final Position position = entry.getPosition();
+ try {
+ if (predicate.test(entry)) {
+ future.complete(position);
+ return;
+ }
+ PositionImpl previousPosition =
ledger.getPreviousPosition((PositionImpl) position);
+ if (!ledger.isValidPosition(previousPosition)) {
+ future.complete(previousPosition);
+ } else {
+ internalAsyncReverseFindPositionOneByOne(ledger,
predicate,
+ ledger.getPreviousPosition((PositionImpl)
position), future);
+ }
+ } catch (Exception e) {
+ future.completeExceptionally(e);
+ } finally {
+ entry.release();
+ }
+ }
+
+ @Override
+ public void readEntryFailed(ManagedLedgerException exception,
Object ctx) {
+ future.completeExceptionally(exception);
+ }
+ }, null);
+ }
+}
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtilsTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtilsTest.java
new file mode 100644
index 00000000000..f13d23c0529
--- /dev/null
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtilsTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.util;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.testng.Assert.assertEquals;
+import java.nio.charset.StandardCharsets;
+import java.util.function.Predicate;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class ManagedLedgerImplUtilsTest extends MockedBookKeeperTestCase {
+
+ @Test
+ public void testGetLastValidPosition() throws Exception {
+ final int maxEntriesPerLedger = 5;
+
+ ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+ managedLedgerConfig.setMaxEntriesPerLedger(maxEntriesPerLedger);
+ ManagedLedger ledger = factory.open("testReverseFindPositionOneByOne",
managedLedgerConfig);
+
+ String matchEntry = "match-entry";
+ String noMatchEntry = "nomatch-entry";
+ Predicate<Entry> predicate = entry -> {
+ String entryValue = entry.getDataBuffer().toString(UTF_8);
+ return matchEntry.equals(entryValue);
+ };
+
+ // New ledger will return the last position, regardless of whether the
conditions are met or not.
+ Position position =
ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger,
+ predicate, (PositionImpl)
ledger.getLastConfirmedEntry()).get();
+ assertEquals(ledger.getLastConfirmedEntry(), position);
+
+ for (int i = 0; i < maxEntriesPerLedger - 1; i++) {
+ ledger.addEntry(matchEntry.getBytes(StandardCharsets.UTF_8));
+ }
+ Position lastMatchPosition =
ledger.addEntry(matchEntry.getBytes(StandardCharsets.UTF_8));
+ for (int i = 0; i < maxEntriesPerLedger; i++) {
+ ledger.addEntry(noMatchEntry.getBytes(StandardCharsets.UTF_8));
+ }
+
+ // Returns last position of entry is "match-entry"
+ position =
ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger,
+ predicate, (PositionImpl)
ledger.getLastConfirmedEntry()).get();
+ assertEquals(position, lastMatchPosition);
+
+ ledger.close();
+ }
+
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 40a0ee4a73c..c7d68a30e72 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2164,29 +2164,31 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
long requestId = getLastMessageId.getRequestId();
Topic topic = consumer.getSubscription().getTopic();
- topic.checkIfTransactionBufferRecoverCompletely(true).thenRun(()
-> {
- Position lastPosition = ((PersistentTopic)
topic).getMaxReadPosition();
- int partitionIndex =
TopicName.getPartitionIndex(topic.getName());
-
- Position markDeletePosition = null;
- if (consumer.getSubscription() instanceof
PersistentSubscription) {
- markDeletePosition = ((PersistentSubscription)
consumer.getSubscription()).getCursor()
- .getMarkDeletedPosition();
- }
-
- getLargestBatchIndexWhenPossible(
- topic,
- (PositionImpl) lastPosition,
- (PositionImpl) markDeletePosition,
- partitionIndex,
- requestId,
- consumer.getSubscription().getName(),
- consumer.readCompacted());
- }).exceptionally(e -> {
-
writeAndFlush(Commands.newError(getLastMessageId.getRequestId(),
- ServerError.UnknownError, "Failed to recover
Transaction Buffer."));
- return null;
- });
+ topic.checkIfTransactionBufferRecoverCompletely(true)
+ .thenCompose(__ -> topic.getLastDispatchablePosition())
+ .thenApply(lastPosition -> {
+ int partitionIndex =
TopicName.getPartitionIndex(topic.getName());
+
+ Position markDeletePosition = null;
+ if (consumer.getSubscription() instanceof
PersistentSubscription) {
+ markDeletePosition = ((PersistentSubscription)
consumer.getSubscription()).getCursor()
+ .getMarkDeletedPosition();
+ }
+
+ getLargestBatchIndexWhenPossible(
+ topic,
+ (PositionImpl) lastPosition,
+ (PositionImpl) markDeletePosition,
+ partitionIndex,
+ requestId,
+ consumer.getSubscription().getName(),
+ consumer.readCompacted());
+ return null;
+ }).exceptionally(e -> {
+
writeAndFlush(Commands.newError(getLastMessageId.getRequestId(),
+ ServerError.UnknownError, "Failed to recover
Transaction Buffer."));
+ return null;
+ });
} else {
writeAndFlush(Commands.newError(getLastMessageId.getRequestId(),
ServerError.MetadataError, "Consumer not found"));
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 343aef09c1c..1da8cfce4ee 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -268,6 +268,13 @@ public interface Topic {
Position getLastPosition();
+ /**
+ * Get the last message position that can be dispatch.
+ */
+ default CompletableFuture<Position> getLastDispatchablePosition() {
+ throw new UnsupportedOperationException("getLastDispatchablePosition
is not supported by default");
+ }
+
CompletableFuture<MessageId> getLastMessageId();
/**
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 3a1da8ccee7..cce77013030 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -78,6 +78,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.impl.ShadowManagedLedgerImpl;
import org.apache.bookkeeper.mledger.util.Futures;
+import org.apache.bookkeeper.mledger.util.ManagedLedgerImplUtils;
import org.apache.bookkeeper.net.BookieId;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -169,6 +170,7 @@ import
org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaType;
@@ -3412,6 +3414,22 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
return ledger.getLastConfirmedEntry();
}
+ @Override
+ public CompletableFuture<Position> getLastDispatchablePosition() {
+ PositionImpl maxReadPosition = getMaxReadPosition();
+ // If `maxReadPosition` is not equal to `LastPosition`. It means that
there are uncommitted transactions.
+ // so return `maxRedPosition` directly.
+ if (maxReadPosition.compareTo((PositionImpl) getLastPosition()) != 0) {
+ return CompletableFuture.completedFuture(maxReadPosition);
+ } else {
+ return
ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger,
entry -> {
+ MessageMetadata md =
Commands.parseMessageMetadata(entry.getDataBuffer());
+ // If a messages has marker will filter by
AbstractBaseDispatcher.filterEntriesForConsumer
+ return !Markers.isServerOnlyMarker(md);
+ }, maxReadPosition);
+ }
+ }
+
@Override
public CompletableFuture<MessageId> getLastMessageId() {
CompletableFuture<MessageId> completableFuture = new
CompletableFuture<>();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
index 4cc3a9ada7d..bfafdf89ed6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
@@ -51,6 +51,7 @@ import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
@@ -167,6 +168,82 @@ public class ReplicatorSubscriptionTest extends
ReplicatorTestBase {
"messages don't match.");
}
+ /**
+ * Tests replicated subscriptions across two regions and can read
successful.
+ */
+ @Test
+ public void testReplicatedSubscriptionAcrossTwoRegionsGetLastMessage()
throws Exception {
+ String namespace =
BrokerTestUtil.newUniqueName("pulsar/replicatedsubscriptionlastmessage");
+ String topicName = "persistent://" + namespace + "/mytopic";
+ String subscriptionName = "cluster-subscription";
+ // this setting can be used to manually run the test with subscription
replication disabled
+ // it shows that subscription replication has no impact in behavior
for this test case
+ boolean replicateSubscriptionState = true;
+
+ admin1.namespaces().createNamespace(namespace);
+ admin1.namespaces().setNamespaceReplicationClusters(namespace,
Sets.newHashSet("r1", "r2"));
+
+ @Cleanup
+ PulsarClient client1 =
PulsarClient.builder().serviceUrl(url1.toString())
+ .statsInterval(0, TimeUnit.SECONDS)
+ .build();
+
+ // create subscription in r1
+ createReplicatedSubscription(client1, topicName, subscriptionName,
replicateSubscriptionState);
+
+ @Cleanup
+ PulsarClient client2 =
PulsarClient.builder().serviceUrl(url2.toString())
+ .statsInterval(0, TimeUnit.SECONDS)
+ .build();
+
+ // create subscription in r2
+ createReplicatedSubscription(client2, topicName, subscriptionName,
replicateSubscriptionState);
+
+ Set<String> sentMessages = new LinkedHashSet<>();
+
+ // send messages in r1
+ @Cleanup
+ Producer<byte[]> producer = client1.newProducer().topic(topicName)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+ int numMessages = 6;
+ for (int i = 0; i < numMessages; i++) {
+ String body = "message" + i;
+ producer.send(body.getBytes(StandardCharsets.UTF_8));
+ sentMessages.add(body);
+ }
+ producer.close();
+
+
+ // consume 3 messages in r1
+ Set<String> receivedMessages = new LinkedHashSet<>();
+ try (Consumer<byte[]> consumer1 = client1.newConsumer()
+ .topic(topicName)
+ .subscriptionName(subscriptionName)
+ .replicateSubscriptionState(replicateSubscriptionState)
+ .subscribe()) {
+ readMessages(consumer1, receivedMessages, 3, false);
+ }
+
+ // wait for subscription to be replicated
+ Thread.sleep(2 *
config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
+
+ // create a reader in r2
+ Reader<byte[]> reader = client2.newReader().topic(topicName)
+ .subscriptionName("new-sub")
+ .startMessageId(MessageId.earliest)
+ .create();
+ int readNum = 0;
+ while (reader.hasMessageAvailable()) {
+ Message<byte[]> message = reader.readNext(10, TimeUnit.SECONDS);
+ assertNotNull(message);
+ log.info("Receive message: " + new String(message.getValue()) + "
msgId: " + message.getMessageId());
+ readNum++;
+ }
+ assertEquals(readNum, numMessages);
+ }
+
@Test
public void testReplicatedSubscribeAndSwitchToStandbyCluster() throws
Exception {
final String namespace = BrokerTestUtil.newUniqueName("pulsar/ns_");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionWithTransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionWithTransactionTest.java
new file mode 100644
index 00000000000..93a22a851f1
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionWithTransactionTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Tests replicated subscriptions with transaction (PIP-33)
+ */
+@Test(groups = "broker")
+public class ReplicatorSubscriptionWithTransactionTest extends
ReplicatorSubscriptionTest {
+
+ @Override
+ @BeforeClass(timeOut = 300000)
+ public void setup() throws Exception {
+ config1.setTransactionCoordinatorEnabled(true);
+ config2.setTransactionCoordinatorEnabled(true);
+ config3.setTransactionCoordinatorEnabled(true);
+ config4.setTransactionCoordinatorEnabled(true);
+ super.setup();
+ }
+
+ @Override
+ @AfterClass(alwaysRun = true, timeOut = 300000)
+ public void cleanup() throws Exception {
+ super.cleanup();
+ }
+
+ @DataProvider(name = "isTopicPolicyEnabled")
+ private Object[][] isTopicPolicyEnabled() {
+ // Todo: fix replication can not be enabled at topic level.
+ return new Object[][] { { Boolean.FALSE } };
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
index fad785cc882..b0903b00be3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
@@ -280,9 +280,9 @@ public class TopicTransactionBufferTest extends
TransactionTestBase {
for (int i = 0; i < 3; i++) {
expectedLastMessageID = (MessageIdImpl)
producer.newMessage().send();
}
- assertMessageId(consumer, expectedLastMessageID, 0);
+ assertMessageId(consumer, expectedLastMessageID);
// 2.2 Case2: send 2 ongoing transactional messages and 2 original
messages.
- // |1:0|1:1|1:2|txn1->1:3|1:4|txn2->1:5|1:6|.
+ // |1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5|1:6|.
Transaction txn1 = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.HOURS)
.build()
@@ -292,18 +292,24 @@ public class TopicTransactionBufferTest extends
TransactionTestBase {
.build()
.get();
producer.newMessage(txn1).send();
+ // expectedLastMessageID1 == 1:4
MessageIdImpl expectedLastMessageID1 = (MessageIdImpl)
producer.newMessage().send();
producer.newMessage(txn2).send();
+ // expectedLastMessageID2 == 1:6
MessageIdImpl expectedLastMessageID2 = (MessageIdImpl)
producer.newMessage().send();
+
// 2.2.1 Last message ID will not change when txn1 and txn2 do not end.
- assertMessageId(consumer, expectedLastMessageID, 0);
+ assertMessageId(consumer, expectedLastMessageID);
+
// 2.2.2 Last message ID will update to 1:4 when txn1 committed.
+ //
|1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5|1:6|tx1:commit->1:7|
txn1.commit().get(5, TimeUnit.SECONDS);
- assertMessageId(consumer, expectedLastMessageID1, 0);
+ assertMessageId(consumer, expectedLastMessageID1);
+
// 2.2.3 Last message ID will update to 1:6 when txn2 aborted.
+ //
|1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5|1:6|tx1:commit->1:7|tx2:abort->1:8|
txn2.abort().get(5, TimeUnit.SECONDS);
- // Todo: We can not ignore the marker's position in this fix.
- assertMessageId(consumer, expectedLastMessageID2, 2);
+ assertMessageId(consumer, expectedLastMessageID2);
}
/**
@@ -362,9 +368,9 @@ public class TopicTransactionBufferTest extends
TransactionTestBase {
});
}
- private void assertMessageId(Consumer<?> consumer, MessageIdImpl expected,
int entryOffset) throws Exception {
+ private void assertMessageId(Consumer<?> consumer, MessageIdImpl expected)
throws Exception {
TopicMessageIdImpl actual = (TopicMessageIdImpl)
consumer.getLastMessageIds().get(0);
- assertEquals(expected.getEntryId(), actual.getEntryId() - entryOffset);
+ assertEquals(expected.getEntryId(), actual.getEntryId());
assertEquals(expected.getLedgerId(), actual.getLedgerId());
}