This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 99cab24b363 [fix][txn] Fix getting last message ID when there are
ongoing transactions (#21466)
99cab24b363 is described below
commit 99cab24b3631437a7d143b3624959505f1e19929
Author: Xiangying Meng <[email protected]>
AuthorDate: Wed Dec 13 10:21:11 2023 +0800
[fix][txn] Fix getting last message ID when there are ongoing transactions
(#21466)
---
.../apache/pulsar/broker/service/ServerCnx.java | 37 +++---
.../broker/service/persistent/PersistentTopic.java | 4 +-
.../buffer/impl/InMemTransactionBuffer.java | 5 +-
.../buffer/impl/TopicTransactionBuffer.java | 4 +-
.../buffer/impl/TransactionBufferDisable.java | 8 +-
.../buffer/TopicTransactionBufferTest.java | 131 +++++++++++++++++++++
6 files changed, 167 insertions(+), 22 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 4e61a3228fe..cca53bf9d6a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2063,23 +2063,28 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
long requestId = getLastMessageId.getRequestId();
Topic topic = consumer.getSubscription().getTopic();
- Position lastPosition = topic.getLastPosition();
- int partitionIndex = TopicName.getPartitionIndex(topic.getName());
-
- Position markDeletePosition = null;
- if (consumer.getSubscription() instanceof PersistentSubscription) {
- markDeletePosition = ((PersistentSubscription)
consumer.getSubscription()).getCursor()
- .getMarkDeletedPosition();
- }
-
- getLargestBatchIndexWhenPossible(
- topic,
- (PositionImpl) lastPosition,
- (PositionImpl) markDeletePosition,
- partitionIndex,
- requestId,
- consumer.getSubscription().getName());
+ topic.checkIfTransactionBufferRecoverCompletely(true).thenRun(()
-> {
+ Position lastPosition = ((PersistentTopic)
topic).getMaxReadPosition();
+ int partitionIndex =
TopicName.getPartitionIndex(topic.getName());
+
+ Position markDeletePosition = null;
+ if (consumer.getSubscription() instanceof
PersistentSubscription) {
+ markDeletePosition = ((PersistentSubscription)
consumer.getSubscription()).getCursor()
+ .getMarkDeletedPosition();
+ }
+ getLargestBatchIndexWhenPossible(
+ topic,
+ (PositionImpl) lastPosition,
+ (PositionImpl) markDeletePosition,
+ partitionIndex,
+ requestId,
+ consumer.getSubscription().getName());
+ }).exceptionally(e -> {
+
writeAndFlush(Commands.newError(getLastMessageId.getRequestId(),
+ ServerError.UnknownError, "Failed to recover
Transaction Buffer."));
+ return null;
+ });
} else {
writeAndFlush(Commands.newError(getLastMessageId.getRequestId(),
ServerError.MetadataError, "Consumer not found"));
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 1619c977d1f..bbd4c31c258 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -309,7 +309,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
this.transactionBuffer = brokerService.getPulsar()
.getTransactionBufferProvider().newTransactionBuffer(this);
} else {
- this.transactionBuffer = new TransactionBufferDisable();
+ this.transactionBuffer = new TransactionBufferDisable(this);
}
transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl)
ledger.getLastConfirmedEntry());
if (ledger instanceof ShadowManagedLedgerImpl) {
@@ -402,7 +402,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
this.transactionBuffer = brokerService.getPulsar()
.getTransactionBufferProvider().newTransactionBuffer(this);
} else {
- this.transactionBuffer = new TransactionBufferDisable();
+ this.transactionBuffer = new TransactionBufferDisable(this);
}
shadowSourceTopic = null;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
index bc2dd58a581..978536c5f4e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
@@ -212,9 +212,12 @@ class InMemTransactionBuffer implements TransactionBuffer {
final ConcurrentMap<TxnID, TxnBuffer> buffers;
final Map<Long, Set<TxnID>> txnIndex;
+ private final Topic topic;
+
public InMemTransactionBuffer(Topic topic) {
this.buffers = new ConcurrentHashMap<>();
this.txnIndex = new HashMap<>();
+ this.topic = topic;
}
@Override
@@ -372,7 +375,7 @@ class InMemTransactionBuffer implements TransactionBuffer {
@Override
public PositionImpl getMaxReadPosition() {
- return PositionImpl.LATEST;
+ return (PositionImpl) topic.getLastPosition();
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 3c13be22086..f356921d698 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -37,6 +37,7 @@ import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.map.LinkedMap;
import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -446,8 +447,7 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
ongoingTxns.remove(txnID);
if (!ongoingTxns.isEmpty()) {
PositionImpl position = ongoingTxns.get(ongoingTxns.firstKey());
- //max read position is less than first ongoing transaction message
position, so entryId -1
- maxReadPosition = PositionImpl.get(position.getLedgerId(),
position.getEntryId() - 1);
+ maxReadPosition = ((ManagedLedgerImpl)
topic.getManagedLedger()).getPreviousPosition(position);
} else {
maxReadPosition = (PositionImpl)
topic.getManagedLedger().getLastConfirmedEntry();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
index 7c74b52951e..9de0888ae5b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
@@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
@@ -40,6 +41,11 @@ import org.apache.pulsar.common.util.FutureUtil;
@Slf4j
public class TransactionBufferDisable implements TransactionBuffer {
+ private final Topic topic;
+ public TransactionBufferDisable(Topic topic) {
+ this.topic = topic;
+ }
+
@Override
public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) {
return CompletableFuture.completedFuture(null);
@@ -91,7 +97,7 @@ public class TransactionBufferDisable implements
TransactionBuffer {
@Override
public PositionImpl getMaxReadPosition() {
- return PositionImpl.LATEST;
+ return (PositionImpl) topic.getLastPosition();
}
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
index aa98fc7d701..6ab56a613c5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
@@ -18,9 +18,18 @@
*/
package org.apache.pulsar.broker.transaction.buffer;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.when;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertTrue;
+import static org.testng.AssertJUnit.fail;
+import java.util.List;
+import lombok.Cleanup;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
@@ -30,8 +39,13 @@ import
org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
+import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
@@ -179,4 +193,121 @@ public class TopicTransactionBufferTest extends
TransactionTestBase {
Assert.assertTrue(f.isCompletedExceptionally());
}
+ /**
+ * This test mainly test the following two point:
+ * 1. `getLastMessageIds` will get max read position.
+ * Send two message |1:0|1:1|; mock max read position as |1:0|;
`getLastMessageIds` will get |1:0|.
+ * 2. `getLastMessageIds` will wait Transaction buffer recover
completely.
+ * Mock `checkIfTBRecoverCompletely` return an exception,
`getLastMessageIds` will fail too.
+ * Mock `checkIfTBRecoverCompletely` return null, `getLastMessageIds`
will get correct result.
+ */
+ @Test
+ public void testGetMaxPositionAfterTBReady() throws Exception {
+ // 1. Prepare test environment.
+ String topic = "persistent://" + NAMESPACE1 +
"/testGetMaxReadyPositionAfterTBReady";
+ // 1.1 Mock component.
+ TransactionBuffer transactionBuffer =
Mockito.spy(TransactionBuffer.class);
+ when(transactionBuffer.checkIfTBRecoverCompletely(anyBoolean()))
+ // Handle producer will check transaction buffer recover
completely.
+ .thenReturn(CompletableFuture.completedFuture(null))
+ // If the Transaction buffer failed to recover, we can not get
the correct last max read id.
+ .thenReturn(CompletableFuture.failedFuture(new Throwable("Mock
fail")))
+ // If the transaction buffer recover successfully, the max
read position can be acquired successfully.
+ .thenReturn(CompletableFuture.completedFuture(null));
+ TransactionBufferProvider transactionBufferProvider =
Mockito.spy(TransactionBufferProvider.class);
+
Mockito.doReturn(transactionBuffer).when(transactionBufferProvider).newTransactionBuffer(any());
+ TransactionBufferProvider originalTBProvider =
getPulsarServiceList().get(0).getTransactionBufferProvider();
+
Mockito.doReturn(transactionBufferProvider).when(getPulsarServiceList().get(0)).getTransactionBufferProvider();
+ // 2. Building producer and consumer.
+ admin.topics().createNonPartitionedTopic(topic);
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName("sub")
+ .subscribe();
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .create();
+ // 3. Send message and test the exception can be handled as expected.
+ MessageIdImpl messageId = (MessageIdImpl) producer.newMessage().send();
+ producer.newMessage().send();
+ Mockito.doReturn(new PositionImpl(messageId.getLedgerId(),
messageId.getEntryId()))
+ .when(transactionBuffer).getMaxReadPosition();
+ try {
+ consumer.getLastMessageIds();
+ fail();
+ } catch (PulsarClientException exception) {
+ assertTrue(exception.getMessage().contains("Failed to recover
Transaction Buffer."));
+ }
+ List<TopicMessageId> messageIdList = consumer.getLastMessageIds();
+ assertEquals(messageIdList.size(), 1);
+ TopicMessageIdImpl actualMessageID = (TopicMessageIdImpl)
messageIdList.get(0);
+ assertEquals(messageId.getLedgerId(), actualMessageID.getLedgerId());
+ assertEquals(messageId.getEntryId(), actualMessageID.getEntryId());
+ // 4. Clean resource
+
Mockito.doReturn(originalTBProvider).when(getPulsarServiceList().get(0)).getTransactionBufferProvider();
+ }
+
+ /**
+ * Add a E2E test for the get last message ID. It tests 4 cases.
+ * <p>
+ * 1. Only normal messages in the topic.
+ * 2. There are ongoing transactions, last message ID will not be
updated until transaction end.
+ * 3. Aborted transaction will make the last message ID be updated
as expected.
+ * 4. Committed transaction will make the last message ID be
updated as expected.
+ * </p>
+ */
+ @Test
+ public void testGetLastMessageIdsWithOngoingTransactions() throws
Exception {
+ // 1. Prepare environment
+ String topic = "persistent://" + NAMESPACE1 +
"/testGetLastMessageIdsWithOngoingTransactions";
+ String subName = "my-subscription";
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .create();
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName(subName)
+ .subscribe();
+
+ // 2. Test last max read position can be required correctly.
+ // 2.1 Case1: send 3 original messages. |1:0|1:1|1:2|
+ MessageIdImpl expectedLastMessageID = null;
+ for (int i = 0; i < 3; i++) {
+ expectedLastMessageID = (MessageIdImpl)
producer.newMessage().send();
+ }
+ assertMessageId(consumer, expectedLastMessageID, 0);
+ // 2.2 Case2: send 2 ongoing transactional messages and 2 original
messages.
+ // |1:0|1:1|1:2|txn1->1:3|1:4|txn2->1:5|1:6|.
+ Transaction txn1 = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.HOURS)
+ .build()
+ .get();
+ Transaction txn2 = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.HOURS)
+ .build()
+ .get();
+ producer.newMessage(txn1).send();
+ MessageIdImpl expectedLastMessageID1 = (MessageIdImpl)
producer.newMessage().send();
+ producer.newMessage(txn2).send();
+ MessageIdImpl expectedLastMessageID2 = (MessageIdImpl)
producer.newMessage().send();
+ // 2.2.1 Last message ID will not change when txn1 and txn2 do not end.
+ assertMessageId(consumer, expectedLastMessageID, 0);
+ // 2.2.2 Last message ID will update to 1:4 when txn1 committed.
+ txn1.commit().get(5, TimeUnit.SECONDS);
+ assertMessageId(consumer, expectedLastMessageID1, 0);
+ // 2.2.3 Last message ID will update to 1:6 when txn2 aborted.
+ txn2.abort().get(5, TimeUnit.SECONDS);
+ // Todo: We can not ignore the marker's position in this fix.
+ assertMessageId(consumer, expectedLastMessageID2, 2);
+ }
+
+ private void assertMessageId(Consumer<?> consumer, MessageIdImpl expected,
int entryOffset) throws Exception {
+ TopicMessageIdImpl actual = (TopicMessageIdImpl)
consumer.getLastMessageIds().get(0);
+ assertEquals(expected.getEntryId(), actual.getEntryId() - entryOffset);
+ assertEquals(expected.getLedgerId(), actual.getLedgerId());
+ }
+
}