This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 35497645437e9459f8358cf14757d99980d2036b
Author: Xiangying Meng <[email protected]>
AuthorDate: Fri Jan 28 09:34:50 2022 +0800

    [Transaction] RetryException should not be return (#13828)
    
    Fix https://github.com/apache/pulsar/issues/13792
    when TC get a RetryException, it will try that operation again and return 
the exception to client.But if the operation is executed again, we don`t need 
to return this RetryException to client.
    
    (cherry picked from commit 0f1913250d962f1763ebb586711a8d9be5ba5d16)
---
 .../org/apache/pulsar/broker/PulsarService.java    |  1 +
 .../broker/TransactionMetadataStoreService.java    | 21 +++++++----
 .../org/apache/pulsar/broker/service/Topic.java    |  5 +++
 .../recover/TransactionRecoverTrackerImpl.java     |  6 ++--
 .../TransactionMetadataStoreServiceTest.java       | 14 +++++---
 .../broker/stats/TransactionMetricsTest.java       |  6 ++--
 .../pulsar/broker/transaction/TransactionTest.java | 41 ++++++++++++++++++++++
 7 files changed, 79 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 79aba81..adeec4c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -127,6 +127,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.conf.InternalConfigurationData;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.configuration.VipStatus;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 3f3e8d8..9b60679 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -321,6 +321,11 @@ public class TransactionMetadataStoreService {
 
     public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction, 
boolean isTimeout) {
         CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        return endTransaction(txnID, txnAction, isTimeout, completableFuture);
+    }
+
+    public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction, 
boolean isTimeout,
+                                                  CompletableFuture<Void> 
completableFuture) {
         TxnStatus newStatus;
         switch (txnAction) {
             case TxnAction.COMMIT_VALUE:
@@ -352,8 +357,9 @@ public class TransactionMetadataStoreService {
                                             + "TxnAction : {}", txnID, 
txnAction, e);
                                 }
                                 transactionOpRetryTimer.newTimeout(timeout ->
-                                        endTransaction(txnID, txnAction, 
isTimeout),
+                                        endTransaction(txnID, txnAction, 
isTimeout, completableFuture),
                                         endTransactionRetryIntervalTime, 
TimeUnit.MILLISECONDS);
+                                return null;
 
                             }
                             
completableFuture.completeExceptionally(e.getCause());
@@ -367,8 +373,9 @@ public class TransactionMetadataStoreService {
                             LOG.debug("EndTransaction UpdateTxnStatus op 
retry! TxnId : {}, "
                                     + "TxnAction : {}", txnID, txnAction, e);
                         }
-                        transactionOpRetryTimer.newTimeout(timeout -> 
endTransaction(txnID, txnAction, isTimeout),
-                                endTransactionRetryIntervalTime, 
TimeUnit.MILLISECONDS);
+                        transactionOpRetryTimer.newTimeout(timeout -> 
endTransaction(txnID, txnAction,
+                                isTimeout, completableFuture), 
endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS);
+                        return null;
 
                     }
                     completableFuture.completeExceptionally(e.getCause());
@@ -385,8 +392,9 @@ public class TransactionMetadataStoreService {
                                         + "TxnAction : {}", txnID, txnAction, 
e);
                             }
                             transactionOpRetryTimer.newTimeout(timeout ->
-                                            endTransaction(txnID, txnAction, 
isTimeout),
+                                            endTransaction(txnID, txnAction, 
isTimeout, completableFuture),
                                     endTransactionRetryIntervalTime, 
TimeUnit.MILLISECONDS);
+                            return null;
                         } else {
                             LOG.error("EndTxnInTransactionBuffer fail! TxnId : 
{}, "
                                     + "TxnAction : {}", txnID, txnAction, e);
@@ -406,8 +414,9 @@ public class TransactionMetadataStoreService {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("End transaction op retry! TxnId : {}, TxnAction 
: {}", txnID, txnAction, e);
                 }
-                transactionOpRetryTimer.newTimeout(timeout -> 
endTransaction(txnID, txnAction, isTimeout),
-                        endTransactionRetryIntervalTime, 
TimeUnit.MILLISECONDS);
+                transactionOpRetryTimer.newTimeout(timeout -> 
endTransaction(txnID, txnAction, isTimeout,
+                        completableFuture), endTransactionRetryIntervalTime, 
TimeUnit.MILLISECONDS);
+                return null;
             }
             completableFuture.completeExceptionally(e.getCause());
             return null;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 0e7589a..9dd945f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -288,4 +288,9 @@ public interface Topic {
      */
     CompletableFuture<Void> truncate();
 
+    /**
+     * Get BrokerService.
+     * @return
+     */
+    BrokerService getBrokerService();
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerImpl.java
index 3667e66..05b61fd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerImpl.java
@@ -123,9 +123,11 @@ public class TransactionRecoverTrackerImpl implements 
TransactionRecoverTracker
     @Override
     public void handleCommittingAndAbortingTransaction() {
         committingTransactions.forEach(k ->
-                transactionMetadataStoreService.endTransaction(new TxnID(tcId, 
k), TxnAction.COMMIT_VALUE, false));
+                transactionMetadataStoreService.endTransaction(new TxnID(tcId, 
k), TxnAction.COMMIT_VALUE,
+                        false));
 
         abortingTransactions.forEach(k ->
-                transactionMetadataStoreService.endTransaction(new TxnID(tcId, 
k), TxnAction.ABORT_VALUE, false));
+                transactionMetadataStoreService.endTransaction(new TxnID(tcId, 
k), TxnAction.ABORT_VALUE,
+                        false));
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
index 983f0d1..e50a39c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
@@ -25,11 +25,13 @@ import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Sets;
+import java.util.concurrent.TimeoutException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -351,13 +353,15 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
         Field field = 
TransactionMetadataStoreState.class.getDeclaredField("state");
         field.setAccessible(true);
         field.set(transactionMetadataStore, 
TransactionMetadataStoreState.State.None);
-
+        CompletableFuture<Void> completableFuture = null;
         try {
-            pulsar.getTransactionMetadataStoreService().endTransaction(txnID, 
TxnAction.COMMIT.getValue(), false).get();
+            completableFuture = 
pulsar.getTransactionMetadataStoreService().endTransaction(txnID, 
TxnAction.COMMIT.getValue(),
+                    false);
+            completableFuture.get(5, TimeUnit.SECONDS);
             fail();
         } catch (Exception e) {
             if (txnStatus == TxnStatus.OPEN || txnStatus == 
TxnStatus.COMMITTING) {
-                assertTrue(e.getCause() instanceof 
CoordinatorException.TransactionMetadataStoreStateException);
+                assertTrue(e instanceof TimeoutException);
             } else if (txnStatus == TxnStatus.ABORTING) {
                 assertTrue(e.getCause() instanceof 
CoordinatorException.InvalidTxnStatusException);
             } else {
@@ -370,9 +374,9 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
         field = TransactionMetadataStoreState.class.getDeclaredField("state");
         field.setAccessible(true);
         field.set(transactionMetadataStore, 
TransactionMetadataStoreState.State.Ready);
-
         if (txnStatus == TxnStatus.ABORTING) {
-            pulsar.getTransactionMetadataStoreService().endTransaction(txnID, 
TxnAction.ABORT.getValue(), false).get();
+            pulsar.getTransactionMetadataStoreService()
+                    .endTransaction(txnID, TxnAction.ABORT.getValue(), 
false).get();
         }
         Awaitility.await().atMost(timeOut, TimeUnit.MILLISECONDS).until(() -> {
             try {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
index 7c48e81..0c9f877 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
@@ -163,9 +163,11 @@ public class TransactionMetricsTest extends BrokerTestBase 
{
 
         for (int i = 0; i < txnCount; i++) {
             if (i % 2 == 0) {
-                
pulsar.getTransactionMetadataStoreService().endTransaction(list.get(i), 
TxnAction.COMMIT_VALUE, false).get();
+                
pulsar.getTransactionMetadataStoreService().endTransaction(list.get(i), 
TxnAction.COMMIT_VALUE,
+                                false).get();
             } else {
-                
pulsar.getTransactionMetadataStoreService().endTransaction(list.get(i), 
TxnAction.ABORT_VALUE, false).get();
+                
pulsar.getTransactionMetadataStoreService().endTransaction(list.get(i), 
TxnAction.ABORT_VALUE,
+                        false).get();
             }
         }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index d7d9089..4aaadb2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -45,6 +45,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -93,6 +94,7 @@ import 
org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
 import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
 import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
@@ -733,4 +735,43 @@ public class TransactionTest extends TransactionTestBase {
         TopicTransactionBuffer transactionBuffer = new 
TopicTransactionBuffer(persistentTopic);
         Awaitility.await().untilAsserted(() -> 
Assert.assertTrue(transactionBuffer.checkIfReady()));
     }
+
+    @Test
+    public void testRetryExceptionOfEndTxn() throws Exception{
+        Transaction transaction = pulsarClient.newTransaction()
+                .withTransactionTimeout(10, TimeUnit.SECONDS)
+                .build()
+                .get();
+        Class<TransactionMetadataStoreState> 
transactionMetadataStoreStateClass = TransactionMetadataStoreState.class;
+        
getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores()
+                .values()
+                .forEach((transactionMetadataStore -> {
+                    try {
+                        Field field = 
transactionMetadataStoreStateClass.getDeclaredField("state");
+                        field.setAccessible(true);
+                        field.set(transactionMetadataStore, 
TransactionMetadataStoreState.State.Initializing);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }));
+        CompletableFuture<Void> completableFuture =  transaction.commit();
+        try {
+            completableFuture.get(5, TimeUnit.SECONDS);
+            fail();
+        } catch (TimeoutException ignored) {
+        }
+        
getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores()
+                .values()
+                .stream()
+                .forEach((transactionMetadataStore -> {
+                    try {
+                        Field field = 
transactionMetadataStoreStateClass.getDeclaredField("state");
+                        field.setAccessible(true);
+                        field.set(transactionMetadataStore, 
TransactionMetadataStoreState.State.Ready);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }));
+        completableFuture.get(5, TimeUnit.SECONDS);
+    }
 }

Reply via email to