This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ba7469b [Transaction]Fix lowWaterMark of TopicTransactionBuffer
(#12312)
ba7469b is described below
commit ba7469b43434bff051f09d98e31f15398d82aaf2
Author: Xiangying Meng <[email protected]>
AuthorDate: Tue Oct 12 12:51:41 2021 +0800
[Transaction]Fix lowWaterMark of TopicTransactionBuffer (#12312)
### Motivation
Fix a bug written in handleLowWaterMark.
When we find that the TxnID of a Transaction in the ongoings which is less
than LowWaterMark , we need to abort the Transaction which we get in ongoings
ranther than txnId got in args.
### Modifications
Modify` abortMarker = Markers.newTxnAbortMarker(-1L,
firstTxn.getMostSigBits(), firstTxn.getLeastSigBits());`
### Verifying this change
Modify the original test to cover this problem
---
.../transaction/buffer/impl/TopicTransactionBuffer.java | 2 +-
.../transaction/buffer/TransactionLowWaterMarkTest.java | 13 ++++++++++---
2 files changed, 11 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 220b432..d3a2655 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -271,7 +271,7 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
TxnID firstTxn = ongoingTxns.firstKey();
if (firstTxn.getMostSigBits() == txnID.getMostSigBits() &&
lowWaterMark >= firstTxn.getLeastSigBits()) {
ByteBuf abortMarker = Markers.newTxnAbortMarker(-1L,
- txnID.getMostSigBits(), txnID.getLeastSigBits());
+ firstTxn.getMostSigBits(), firstTxn.getLeastSigBits());
try {
topic.getManagedLedger().asyncAddEntry(abortMarker, new
AsyncCallbacks.AddEntryCallback() {
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
index 3e0a8fc..db9d407 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
@@ -33,6 +33,7 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import javax.validation.constraints.AssertTrue;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
@@ -50,6 +51,7 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
+import
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
@@ -63,8 +65,10 @@ import
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
+import
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
import
org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.awaitility.Awaitility;
+import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -164,9 +168,12 @@ public class TransactionLowWaterMarkTest extends
TransactionTestBase {
field.setAccessible(true);
field.set(txn, TransactionImpl.State.OPEN);
producer.newMessage(txn).value(TEST2.getBytes()).send();
-
- message = consumer.receive(2, TimeUnit.SECONDS);
- assertNull(message);
+ try {
+ txn.commit().get();
+ Assert.fail("The commit operation should be failed.");
+ } catch (Exception e){
+ Assert.assertTrue(e.getCause() instanceof
TransactionCoordinatorClientException.TransactionNotFoundException);
+ }
PartitionedTopicMetadata partitionedTopicMetadata =
((PulsarClientImpl) pulsarClient).getLookup()