This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ad00315f61b5b05590e64bbdf9e4406c09854141 Author: Baodi Shi <[email protected]> AuthorDate: Sun Apr 28 20:22:09 2024 +0800 [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState (#22572) (cherry picked from commit a761b97b733142b1ade525e1d1c06785e98face1) --- .../mledger/util/ManagedLedgerImplUtils.java | 82 ++++++++++++++++++++++ .../mledger/util/ManagedLedgerImplUtilsTest.java | 74 +++++++++++++++++++ .../apache/pulsar/broker/service/ServerCnx.java | 45 ++++++------ .../org/apache/pulsar/broker/service/Topic.java | 7 ++ .../broker/service/persistent/PersistentTopic.java | 18 +++++ .../broker/service/ReplicatorSubscriptionTest.java | 77 ++++++++++++++++++++ .../ReplicatorSubscriptionWithTransactionTest.java | 53 ++++++++++++++ .../buffer/TopicTransactionBufferTest.java | 22 +++--- 8 files changed, 348 insertions(+), 30 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java new file mode 100644 index 00000000000..922a6ed11a5 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.util; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Predicate; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.common.classification.InterfaceStability; + [email protected] +public class ManagedLedgerImplUtils { + + /** + * Reverse find last valid position one-entry by one-entry. + */ + public static CompletableFuture<Position> asyncGetLastValidPosition(final ManagedLedgerImpl ledger, + final Predicate<Entry> predicate, + final PositionImpl startPosition) { + CompletableFuture<Position> future = new CompletableFuture<>(); + if (!ledger.isValidPosition(startPosition)) { + future.complete(startPosition); + } else { + internalAsyncReverseFindPositionOneByOne(ledger, predicate, startPosition, future); + } + return future; + } + + private static void internalAsyncReverseFindPositionOneByOne(final ManagedLedgerImpl ledger, + final Predicate<Entry> predicate, + final PositionImpl position, + final CompletableFuture<Position> future) { + ledger.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() { + @Override + public void readEntryComplete(Entry entry, Object ctx) { + final Position position = entry.getPosition(); + try { + if (predicate.test(entry)) { + future.complete(position); + return; + } + PositionImpl previousPosition = ledger.getPreviousPosition((PositionImpl) position); + if (!ledger.isValidPosition(previousPosition)) { + future.complete(previousPosition); + } else { + internalAsyncReverseFindPositionOneByOne(ledger, predicate, + ledger.getPreviousPosition((PositionImpl) position), future); + } + } catch (Exception e) { + future.completeExceptionally(e); + } finally { + entry.release(); + } + } + + @Override + public void readEntryFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + }, null); + } +} diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtilsTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtilsTest.java new file mode 100644 index 00000000000..5b3ea308a9e --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtilsTest.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.util; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.testng.Assert.assertEquals; +import java.nio.charset.StandardCharsets; +import java.util.function.Predicate; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.testng.annotations.Test; + +@Slf4j +public class ManagedLedgerImplUtilsTest extends MockedBookKeeperTestCase { + + @Test + public void testGetLastValidPosition() throws Exception { + final int maxEntriesPerLedger = 5; + + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + managedLedgerConfig.setMaxEntriesPerLedger(maxEntriesPerLedger); + ManagedLedger ledger = factory.open("testReverseFindPositionOneByOne", managedLedgerConfig); + + String matchEntry = "match-entry"; + String noMatchEntry = "nomatch-entry"; + Predicate<Entry> predicate = entry -> { + String entryValue = entry.getDataBuffer().toString(UTF_8); + return matchEntry.equals(entryValue); + }; + + // New ledger will return the last position, regardless of whether the conditions are met or not. + Position position = ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, + predicate, (PositionImpl) ledger.getLastConfirmedEntry()).get(); + assertEquals(ledger.getLastConfirmedEntry(), position); + + for (int i = 0; i < maxEntriesPerLedger - 1; i++) { + ledger.addEntry(matchEntry.getBytes(StandardCharsets.UTF_8)); + } + Position lastMatchPosition = ledger.addEntry(matchEntry.getBytes(StandardCharsets.UTF_8)); + for (int i = 0; i < maxEntriesPerLedger; i++) { + ledger.addEntry(noMatchEntry.getBytes(StandardCharsets.UTF_8)); + } + + // Returns last position of entry is "match-entry" + position = ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, + predicate, (PositionImpl) ledger.getLastConfirmedEntry()).get(); + assertEquals(position, lastMatchPosition); + + ledger.close(); + } + +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index d118ff0c31e..55cb71cb256 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1927,28 +1927,29 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { long requestId = getLastMessageId.getRequestId(); Topic topic = consumer.getSubscription().getTopic(); - topic.checkIfTransactionBufferRecoverCompletely(true).thenRun(() -> { - Position lastPosition = ((PersistentTopic) topic).getMaxReadPosition(); - int partitionIndex = TopicName.getPartitionIndex(topic.getName()); - - Position markDeletePosition = null; - if (consumer.getSubscription() instanceof PersistentSubscription) { - markDeletePosition = ((PersistentSubscription) consumer.getSubscription()).getCursor() - .getMarkDeletedPosition(); - } - - getLargestBatchIndexWhenPossible( - topic, - (PositionImpl) lastPosition, - (PositionImpl) markDeletePosition, - partitionIndex, - requestId, - consumer.getSubscription().getName()); - }).exceptionally(e -> { - ctx.writeAndFlush(Commands.newError(getLastMessageId.getRequestId(), - ServerError.UnknownError, "Failed to recover Transaction Buffer.")); - return null; - }); + topic.checkIfTransactionBufferRecoverCompletely(true) + .thenCompose(__ -> topic.getLastDispatchablePosition()) + .thenApply(lastPosition -> { + int partitionIndex = TopicName.getPartitionIndex(topic.getName()); + + Position markDeletePosition = null; + if (consumer.getSubscription() instanceof PersistentSubscription) { + markDeletePosition = ((PersistentSubscription) consumer.getSubscription()).getCursor() + .getMarkDeletedPosition(); + } + getLargestBatchIndexWhenPossible( + topic, + (PositionImpl) lastPosition, + (PositionImpl) markDeletePosition, + partitionIndex, + requestId, + consumer.getSubscription().getName()); + return null; + }).exceptionally(e -> { + ctx.writeAndFlush(Commands.newError(getLastMessageId.getRequestId(), + ServerError.UnknownError, "Failed to recover Transaction Buffer.")); + return null; + }); } else { ctx.writeAndFlush(Commands.newError(getLastMessageId.getRequestId(), ServerError.MetadataError, "Consumer not found")); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 1605f5503ef..b853b6241d1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -259,6 +259,13 @@ public interface Topic { Position getLastPosition(); + /** + * Get the last message position that can be dispatch. + */ + default CompletableFuture<Position> getLastDispatchablePosition() { + throw new UnsupportedOperationException("getLastDispatchablePosition is not supported by default"); + } + CompletableFuture<MessageId> getLastMessageId(); /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 61654339203..2cf7611be0f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -79,6 +79,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer.CursorInfo; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.mledger.util.ManagedLedgerImplUtils; import org.apache.bookkeeper.net.BookieId; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -155,6 +156,7 @@ import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl; import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl; import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.protocol.Markers; import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.util.Codec; @@ -2983,6 +2985,22 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal return ledger.getLastConfirmedEntry(); } + @Override + public CompletableFuture<Position> getLastDispatchablePosition() { + PositionImpl maxReadPosition = getMaxReadPosition(); + // If `maxReadPosition` is not equal to `LastPosition`. It means that there are uncommitted transactions. + // so return `maxRedPosition` directly. + if (maxReadPosition.compareTo((PositionImpl) getLastPosition()) != 0) { + return CompletableFuture.completedFuture(maxReadPosition); + } else { + return ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry -> { + MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer()); + // If a messages has marker will filter by AbstractBaseDispatcher.filterEntriesForConsumer + return !Markers.isServerOnlyMarker(md); + }, maxReadPosition); + } + } + @Override public CompletableFuture<MessageId> getLastMessageId() { CompletableFuture<MessageId> completableFuture = new CompletableFuture<>(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java index f3074c457ef..59502cb9d7e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java @@ -48,6 +48,7 @@ import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.policies.data.PartitionedTopicStats; import org.apache.pulsar.common.policies.data.TenantInfoImpl; @@ -160,6 +161,82 @@ public class ReplicatorSubscriptionTest extends ReplicatorTestBase { "messages don't match."); } + /** + * Tests replicated subscriptions across two regions and can read successful. + */ + @Test + public void testReplicatedSubscriptionAcrossTwoRegionsGetLastMessage() throws Exception { + String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscriptionlastmessage"); + String topicName = "persistent://" + namespace + "/mytopic"; + String subscriptionName = "cluster-subscription"; + // this setting can be used to manually run the test with subscription replication disabled + // it shows that subscription replication has no impact in behavior for this test case + boolean replicateSubscriptionState = true; + + admin1.namespaces().createNamespace(namespace); + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2")); + + @Cleanup + PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()) + .statsInterval(0, TimeUnit.SECONDS) + .build(); + + // create subscription in r1 + createReplicatedSubscription(client1, topicName, subscriptionName, replicateSubscriptionState); + + @Cleanup + PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()) + .statsInterval(0, TimeUnit.SECONDS) + .build(); + + // create subscription in r2 + createReplicatedSubscription(client2, topicName, subscriptionName, replicateSubscriptionState); + + Set<String> sentMessages = new LinkedHashSet<>(); + + // send messages in r1 + @Cleanup + Producer<byte[]> producer = client1.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + int numMessages = 6; + for (int i = 0; i < numMessages; i++) { + String body = "message" + i; + producer.send(body.getBytes(StandardCharsets.UTF_8)); + sentMessages.add(body); + } + producer.close(); + + + // consume 3 messages in r1 + Set<String> receivedMessages = new LinkedHashSet<>(); + try (Consumer<byte[]> consumer1 = client1.newConsumer() + .topic(topicName) + .subscriptionName(subscriptionName) + .replicateSubscriptionState(replicateSubscriptionState) + .subscribe()) { + readMessages(consumer1, receivedMessages, 3, false); + } + + // wait for subscription to be replicated + Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis()); + + // create a reader in r2 + Reader<byte[]> reader = client2.newReader().topic(topicName) + .subscriptionName("new-sub") + .startMessageId(MessageId.earliest) + .create(); + int readNum = 0; + while (reader.hasMessageAvailable()) { + Message<byte[]> message = reader.readNext(10, TimeUnit.SECONDS); + assertNotNull(message); + log.info("Receive message: " + new String(message.getValue()) + " msgId: " + message.getMessageId()); + readNum++; + } + assertEquals(readNum, numMessages); + } + @Test public void testReplicatedSubscribeAndSwitchToStandbyCluster() throws Exception { final String namespace = BrokerTestUtil.newUniqueName("pulsar/ns_"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionWithTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionWithTransactionTest.java new file mode 100644 index 00000000000..7e588f845b9 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionWithTransactionTest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +/** + * Tests replicated subscriptions with transaction (PIP-33) + */ +@Test(groups = "broker") +public class ReplicatorSubscriptionWithTransactionTest extends ReplicatorSubscriptionTest { + + @Override + @BeforeClass(timeOut = 300000) + public void setup() throws Exception { + config1.setTransactionCoordinatorEnabled(true); + config2.setTransactionCoordinatorEnabled(true); + config3.setTransactionCoordinatorEnabled(true); + config4.setTransactionCoordinatorEnabled(true); + super.setup(); + } + + @Override + @AfterClass(alwaysRun = true, timeOut = 300000) + public void cleanup() throws Exception { + super.cleanup(); + } + + @DataProvider(name = "isTopicPolicyEnabled") + private Object[][] isTopicPolicyEnabled() { + // Todo: fix replication can not be enabled at topic level. + return new Object[][] { { Boolean.FALSE } }; + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index cf530d7ce22..6d26fcc0e4b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -273,9 +273,9 @@ public class TopicTransactionBufferTest extends TransactionTestBase { for (int i = 0; i < 3; i++) { expectedLastMessageID = (MessageIdImpl) producer.newMessage().send(); } - assertMessageId(consumer, expectedLastMessageID, 0); + assertMessageId(consumer, expectedLastMessageID); // 2.2 Case2: send 2 ongoing transactional messages and 2 original messages. - // |1:0|1:1|1:2|txn1->1:3|1:4|txn2->1:5|1:6|. + // |1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5|1:6|. Transaction txn1 = pulsarClient.newTransaction() .withTransactionTimeout(5, TimeUnit.HOURS) .build() @@ -285,23 +285,29 @@ public class TopicTransactionBufferTest extends TransactionTestBase { .build() .get(); producer.newMessage(txn1).send(); + // expectedLastMessageID1 == 1:4 MessageIdImpl expectedLastMessageID1 = (MessageIdImpl) producer.newMessage().send(); producer.newMessage(txn2).send(); + // expectedLastMessageID2 == 1:6 MessageIdImpl expectedLastMessageID2 = (MessageIdImpl) producer.newMessage().send(); + // 2.2.1 Last message ID will not change when txn1 and txn2 do not end. - assertMessageId(consumer, expectedLastMessageID, 0); + assertMessageId(consumer, expectedLastMessageID); + // 2.2.2 Last message ID will update to 1:4 when txn1 committed. + // |1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5|1:6|tx1:commit->1:7| txn1.commit().get(5, TimeUnit.SECONDS); - assertMessageId(consumer, expectedLastMessageID1, 0); + assertMessageId(consumer, expectedLastMessageID1); + // 2.2.3 Last message ID will update to 1:6 when txn2 aborted. + // |1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5|1:6|tx1:commit->1:7|tx2:abort->1:8| txn2.abort().get(5, TimeUnit.SECONDS); - // Todo: We can not ignore the marker's position in this fix. - assertMessageId(consumer, expectedLastMessageID2, 2); + assertMessageId(consumer, expectedLastMessageID2); } - private void assertMessageId(Consumer<?> consumer, MessageIdImpl expected, int entryOffset) throws Exception { + private void assertMessageId(Consumer<?> consumer, MessageIdImpl expected) throws Exception { MessageIdImpl msgId = (MessageIdImpl) consumer.getLastMessageId(); - assertEquals(expected.getEntryId(), msgId.getEntryId() - entryOffset); + assertEquals(expected.getEntryId(), msgId.getEntryId()); assertEquals(expected.getLedgerId(), msgId.getLedgerId()); }
