This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ba7469b  [Transaction]Fix lowWaterMark of TopicTransactionBuffer 
(#12312)
ba7469b is described below

commit ba7469b43434bff051f09d98e31f15398d82aaf2
Author: Xiangying Meng <[email protected]>
AuthorDate: Tue Oct 12 12:51:41 2021 +0800

    [Transaction]Fix lowWaterMark of TopicTransactionBuffer (#12312)
    
    ### Motivation
    
    Fix a bug written in handleLowWaterMark.
    When we find that the TxnID of a Transaction in the ongoings which is less 
than LowWaterMark , we need to abort the Transaction which we get in ongoings 
ranther than txnId got in args.
    ### Modifications
    Modify` abortMarker = Markers.newTxnAbortMarker(-1L,   
firstTxn.getMostSigBits(), firstTxn.getLeastSigBits());`
    
    ### Verifying this change
    Modify the original test to cover this problem
---
 .../transaction/buffer/impl/TopicTransactionBuffer.java     |  2 +-
 .../transaction/buffer/TransactionLowWaterMarkTest.java     | 13 ++++++++++---
 2 files changed, 11 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 220b432..d3a2655 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -271,7 +271,7 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
             TxnID firstTxn = ongoingTxns.firstKey();
             if (firstTxn.getMostSigBits() == txnID.getMostSigBits() && 
lowWaterMark >= firstTxn.getLeastSigBits()) {
                 ByteBuf abortMarker = Markers.newTxnAbortMarker(-1L,
-                        txnID.getMostSigBits(), txnID.getLeastSigBits());
+                        firstTxn.getMostSigBits(), firstTxn.getLeastSigBits());
                 try {
                     topic.getManagedLedger().asyncAddEntry(abortMarker, new 
AsyncCallbacks.AddEntryCallback() {
                         @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
index 3e0a8fc..db9d407 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
@@ -33,6 +33,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
+import javax.validation.constraints.AssertTrue;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 
@@ -50,6 +51,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
+import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
@@ -63,8 +65,10 @@ import 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
 import 
org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
 import org.awaitility.Awaitility;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -164,9 +168,12 @@ public class TransactionLowWaterMarkTest extends 
TransactionTestBase {
         field.setAccessible(true);
         field.set(txn, TransactionImpl.State.OPEN);
         producer.newMessage(txn).value(TEST2.getBytes()).send();
-
-        message = consumer.receive(2, TimeUnit.SECONDS);
-        assertNull(message);
+        try {
+            txn.commit().get();
+            Assert.fail("The commit operation should be failed.");
+        } catch (Exception e){
+            Assert.assertTrue(e.getCause() instanceof 
TransactionCoordinatorClientException.TransactionNotFoundException);
+        }
 
         PartitionedTopicMetadata partitionedTopicMetadata =
                 ((PulsarClientImpl) pulsarClient).getLookup()

Reply via email to