This is an automated email from the ASF dual-hosted git repository.
daojun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1f72817aefe [fix] [txn] Get previous position by managed ledger.
(#22024)
1f72817aefe is described below
commit 1f72817aefe9ce9972a0108450711d7074c9dc7f
Author: thetumbled <[email protected]>
AuthorDate: Thu Feb 22 12:07:43 2024 +0800
[fix] [txn] Get previous position by managed ledger. (#22024)
---
.../buffer/impl/TopicTransactionBuffer.java | 4 +-
.../buffer/TopicTransactionBufferTest.java | 58 ++++++++++++++++++++++
2 files changed, 60 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index f356921d698..5392e473947 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -287,8 +287,8 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
.checkAbortedTransaction(txnId)) {
ongoingTxns.put(txnId, (PositionImpl) position);
PositionImpl firstPosition =
ongoingTxns.get(ongoingTxns.firstKey());
- //max read position is less than first ongoing transaction message
position, so entryId -1
- maxReadPosition = PositionImpl.get(firstPosition.getLedgerId(),
firstPosition.getEntryId() - 1);
+ // max read position is less than first ongoing transaction
message position
+ maxReadPosition = ((ManagedLedgerImpl)
topic.getManagedLedger()).getPreviousPosition(firstPosition);
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
index e5ad910cb1f..fad785cc882 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
@@ -40,8 +40,10 @@ import
org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -304,6 +306,62 @@ public class TopicTransactionBufferTest extends
TransactionTestBase {
assertMessageId(consumer, expectedLastMessageID2, 2);
}
+ /**
+ * produce 3 messages and then trigger a ledger switch,
+ * then create a transaction and send a transactional message.
+ * As there are messages in the new ledger, the reader should be able to
read the messages.
+ * But reader.hasMessageAvailable() returns false if the entry id of max
read position is -1.
+ * @throws Exception
+ */
+ @Test
+ public void testGetLastMessageIdsWithOpenTransactionAtLedgerHead() throws
Exception {
+ String topic = "persistent://" + NAMESPACE1 +
"/testGetLastMessageIdsWithOpenTransactionAtLedgerHead";
+ String subName = "my-subscription";
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .create();
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName(subName)
+ .subscribe();
+ MessageId expectedLastMessageID = null;
+ for (int i = 0; i < 3; i++) {
+ expectedLastMessageID =
producer.newMessage().value(String.valueOf(i).getBytes()).send();
+ System.out.println("expectedLastMessageID: " +
expectedLastMessageID);
+ }
+ triggerLedgerSwitch(topic);
+ Transaction txn = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.HOURS)
+ .build()
+ .get();
+ producer.newMessage(txn).send();
+
+ Reader<byte[]> reader = pulsarClient.newReader()
+ .topic(topic)
+ .startMessageId(MessageId.earliest)
+ .create();
+ assertTrue(reader.hasMessageAvailable());
+ }
+
+ private void triggerLedgerSwitch(String topicName) throws Exception{
+ admin.topics().unload(topicName);
+ Awaitility.await().until(() -> {
+ CompletableFuture<Optional<Topic>> topicFuture =
+
getPulsarServiceList().get(0).getBrokerService().getTopic(topicName, false);
+ if (!topicFuture.isDone() ||
topicFuture.isCompletedExceptionally()){
+ return false;
+ }
+ Optional<Topic> topicOptional = topicFuture.join();
+ if (!topicOptional.isPresent()){
+ return false;
+ }
+ PersistentTopic persistentTopic = (PersistentTopic)
topicOptional.get();
+ ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)
persistentTopic.getManagedLedger();
+ return managedLedger.getState() ==
ManagedLedgerImpl.State.LedgerOpened;
+ });
+ }
+
private void assertMessageId(Consumer<?> consumer, MessageIdImpl expected,
int entryOffset) throws Exception {
TopicMessageIdImpl actual = (TopicMessageIdImpl)
consumer.getLastMessageIds().get(0);
assertEquals(expected.getEntryId(), actual.getEntryId() - entryOffset);