This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 78cda11de37601c41e9a67fc1f36b1a21992f1c1 Author: Xiangying Meng <[email protected]> AuthorDate: Wed Dec 13 10:21:11 2023 +0800 [fix][txn] Fix getting last message ID when there are ongoing transactions (#21466) (cherry picked from commit 50007c343ad911edf5654786a7e3a1fc10901091) --- .../apache/pulsar/broker/service/ServerCnx.java | 37 +++--- .../broker/service/persistent/PersistentTopic.java | 4 +- .../buffer/impl/InMemTransactionBuffer.java | 5 +- .../buffer/impl/TopicTransactionBuffer.java | 3 +- .../buffer/impl/TransactionBufferDisable.java | 8 +- .../buffer/TopicTransactionBufferTest.java | 126 +++++++++++++++++++++ 6 files changed, 161 insertions(+), 22 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 224a4489478..d118ff0c31e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1927,23 +1927,28 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { long requestId = getLastMessageId.getRequestId(); Topic topic = consumer.getSubscription().getTopic(); - Position lastPosition = topic.getLastPosition(); - int partitionIndex = TopicName.getPartitionIndex(topic.getName()); - - Position markDeletePosition = null; - if (consumer.getSubscription() instanceof PersistentSubscription) { - markDeletePosition = ((PersistentSubscription) consumer.getSubscription()).getCursor() - .getMarkDeletedPosition(); - } - - getLargestBatchIndexWhenPossible( - topic, - (PositionImpl) lastPosition, - (PositionImpl) markDeletePosition, - partitionIndex, - requestId, - consumer.getSubscription().getName()); + topic.checkIfTransactionBufferRecoverCompletely(true).thenRun(() -> { + Position lastPosition = ((PersistentTopic) topic).getMaxReadPosition(); + int partitionIndex = TopicName.getPartitionIndex(topic.getName()); + + Position markDeletePosition = null; + if (consumer.getSubscription() instanceof PersistentSubscription) { + markDeletePosition = ((PersistentSubscription) consumer.getSubscription()).getCursor() + .getMarkDeletedPosition(); + } + getLargestBatchIndexWhenPossible( + topic, + (PositionImpl) lastPosition, + (PositionImpl) markDeletePosition, + partitionIndex, + requestId, + consumer.getSubscription().getName()); + }).exceptionally(e -> { + ctx.writeAndFlush(Commands.newError(getLastMessageId.getRequestId(), + ServerError.UnknownError, "Failed to recover Transaction Buffer.")); + return null; + }); } else { ctx.writeAndFlush(Commands.newError(getLastMessageId.getRequestId(), ServerError.MetadataError, "Consumer not found")); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 71696007049..61654339203 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -330,7 +330,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal this.transactionBuffer = brokerService.getPulsar() .getTransactionBufferProvider().newTransactionBuffer(this); } else { - this.transactionBuffer = new TransactionBufferDisable(); + this.transactionBuffer = new TransactionBufferDisable(this); } transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry()); } @@ -406,7 +406,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal this.transactionBuffer = brokerService.getPulsar() .getTransactionBufferProvider().newTransactionBuffer(this); } else { - this.transactionBuffer = new TransactionBufferDisable(); + this.transactionBuffer = new TransactionBufferDisable(this); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java index c4a9fc2b774..7bd9405fd6b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java @@ -211,9 +211,12 @@ class InMemTransactionBuffer implements TransactionBuffer { final ConcurrentMap<TxnID, TxnBuffer> buffers; final Map<Long, Set<TxnID>> txnIndex; + private final Topic topic; + public InMemTransactionBuffer(Topic topic) { this.buffers = new ConcurrentHashMap<>(); this.txnIndex = new HashMap<>(); + this.topic = topic; } @Override @@ -371,7 +374,7 @@ class InMemTransactionBuffer implements TransactionBuffer { @Override public PositionImpl getMaxReadPosition() { - return PositionImpl.LATEST; + return (PositionImpl) topic.getLastPosition(); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index ebf61c3f07f..61bd3b29ab6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -515,8 +515,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen ongoingTxns.remove(txnID); if (!ongoingTxns.isEmpty()) { PositionImpl position = ongoingTxns.get(ongoingTxns.firstKey()); - //max read position is less than first ongoing transaction message position, so entryId -1 - maxReadPosition = PositionImpl.get(position.getLedgerId(), position.getEntryId() - 1); + maxReadPosition = ((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position); } else { maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java index d700195416c..b2a8991189d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java @@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer; import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader; import org.apache.pulsar.broker.transaction.buffer.TransactionMeta; @@ -39,6 +40,11 @@ import org.apache.pulsar.common.util.FutureUtil; @Slf4j public class TransactionBufferDisable implements TransactionBuffer { + private final Topic topic; + public TransactionBufferDisable(Topic topic) { + this.topic = topic; + } + @Override public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) { return CompletableFuture.completedFuture(null); @@ -90,7 +96,7 @@ public class TransactionBufferDisable implements TransactionBuffer { @Override public PositionImpl getMaxReadPosition() { - return PositionImpl.LATEST; + return (PositionImpl) topic.getLastPosition(); } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index 514e135f350..cf530d7ce22 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -18,9 +18,17 @@ */ package org.apache.pulsar.broker.transaction.buffer; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.when; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertTrue; +import static org.testng.AssertJUnit.fail; +import lombok.Cleanup; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.Topic; @@ -29,8 +37,11 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.transaction.TransactionTestBase; import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer; import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState; +import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.transaction.Transaction; +import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; @@ -179,4 +190,119 @@ public class TopicTransactionBufferTest extends TransactionTestBase { Assert.assertTrue(f.isCompletedExceptionally()); } + /** + * This test mainly test the following two point: + * 1. `getLastMessageIds` will get max read position. + * Send two message |1:0|1:1|; mock max read position as |1:0|; `getLastMessageIds` will get |1:0|. + * 2. `getLastMessageIds` will wait Transaction buffer recover completely. + * Mock `checkIfTBRecoverCompletely` return an exception, `getLastMessageIds` will fail too. + * Mock `checkIfTBRecoverCompletely` return null, `getLastMessageIds` will get correct result. + */ + @Test + public void testGetMaxPositionAfterTBReady() throws Exception { + // 1. Prepare test environment. + String topic = "persistent://" + NAMESPACE1 + "/testGetMaxReadyPositionAfterTBReady"; + // 1.1 Mock component. + TransactionBuffer transactionBuffer = Mockito.spy(TransactionBuffer.class); + when(transactionBuffer.checkIfTBRecoverCompletely(anyBoolean())) + // Handle producer will check transaction buffer recover completely. + .thenReturn(CompletableFuture.completedFuture(null)) + // If the Transaction buffer failed to recover, we can not get the correct last max read id. + .thenReturn(CompletableFuture.failedFuture(new Throwable("Mock fail"))) + // If the transaction buffer recover successfully, the max read position can be acquired successfully. + .thenReturn(CompletableFuture.completedFuture(null)); + TransactionBufferProvider transactionBufferProvider = Mockito.spy(TransactionBufferProvider.class); + Mockito.doReturn(transactionBuffer).when(transactionBufferProvider).newTransactionBuffer(any()); + TransactionBufferProvider originalTBProvider = getPulsarServiceList().get(0).getTransactionBufferProvider(); + Mockito.doReturn(transactionBufferProvider).when(getPulsarServiceList().get(0)).getTransactionBufferProvider(); + // 2. Building producer and consumer. + admin.topics().createNonPartitionedTopic(topic); + @Cleanup + Consumer<byte[]> consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName("sub") + .subscribe(); + @Cleanup + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topic) + .create(); + // 3. Send message and test the exception can be handled as expected. + MessageIdImpl messageId = (MessageIdImpl) producer.newMessage().send(); + producer.newMessage().send(); + Mockito.doReturn(new PositionImpl(messageId.getLedgerId(), messageId.getEntryId())) + .when(transactionBuffer).getMaxReadPosition(); + try { + consumer.getLastMessageId(); + fail(); + } catch (PulsarClientException exception) { + assertTrue(exception.getMessage().contains("Failed to recover Transaction Buffer.")); + } + MessageIdImpl msgId = (MessageIdImpl) consumer.getLastMessageId(); + assertEquals(messageId.getLedgerId(), msgId.getLedgerId()); + assertEquals(messageId.getEntryId(), msgId.getEntryId()); + // 4. Clean resource + Mockito.doReturn(originalTBProvider).when(getPulsarServiceList().get(0)).getTransactionBufferProvider(); + } + + /** + * Add a E2E test for the get last message ID. It tests 4 cases. + * <p> + * 1. Only normal messages in the topic. + * 2. There are ongoing transactions, last message ID will not be updated until transaction end. + * 3. Aborted transaction will make the last message ID be updated as expected. + * 4. Committed transaction will make the last message ID be updated as expected. + * </p> + */ + @Test + public void testGetLastMessageIdsWithOngoingTransactions() throws Exception { + // 1. Prepare environment + String topic = "persistent://" + NAMESPACE1 + "/testGetLastMessageIdsWithOngoingTransactions"; + String subName = "my-subscription"; + @Cleanup + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topic) + .create(); + Consumer<byte[]> consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName(subName) + .subscribe(); + + // 2. Test last max read position can be required correctly. + // 2.1 Case1: send 3 original messages. |1:0|1:1|1:2| + MessageIdImpl expectedLastMessageID = null; + for (int i = 0; i < 3; i++) { + expectedLastMessageID = (MessageIdImpl) producer.newMessage().send(); + } + assertMessageId(consumer, expectedLastMessageID, 0); + // 2.2 Case2: send 2 ongoing transactional messages and 2 original messages. + // |1:0|1:1|1:2|txn1->1:3|1:4|txn2->1:5|1:6|. + Transaction txn1 = pulsarClient.newTransaction() + .withTransactionTimeout(5, TimeUnit.HOURS) + .build() + .get(); + Transaction txn2 = pulsarClient.newTransaction() + .withTransactionTimeout(5, TimeUnit.HOURS) + .build() + .get(); + producer.newMessage(txn1).send(); + MessageIdImpl expectedLastMessageID1 = (MessageIdImpl) producer.newMessage().send(); + producer.newMessage(txn2).send(); + MessageIdImpl expectedLastMessageID2 = (MessageIdImpl) producer.newMessage().send(); + // 2.2.1 Last message ID will not change when txn1 and txn2 do not end. + assertMessageId(consumer, expectedLastMessageID, 0); + // 2.2.2 Last message ID will update to 1:4 when txn1 committed. + txn1.commit().get(5, TimeUnit.SECONDS); + assertMessageId(consumer, expectedLastMessageID1, 0); + // 2.2.3 Last message ID will update to 1:6 when txn2 aborted. + txn2.abort().get(5, TimeUnit.SECONDS); + // Todo: We can not ignore the marker's position in this fix. + assertMessageId(consumer, expectedLastMessageID2, 2); + } + + private void assertMessageId(Consumer<?> consumer, MessageIdImpl expected, int entryOffset) throws Exception { + MessageIdImpl msgId = (MessageIdImpl) consumer.getLastMessageId(); + assertEquals(expected.getEntryId(), msgId.getEntryId() - entryOffset); + assertEquals(expected.getLedgerId(), msgId.getLedgerId()); + } + }
