This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 35497645437e9459f8358cf14757d99980d2036b Author: Xiangying Meng <[email protected]> AuthorDate: Fri Jan 28 09:34:50 2022 +0800 [Transaction] RetryException should not be return (#13828) Fix https://github.com/apache/pulsar/issues/13792 when TC get a RetryException, it will try that operation again and return the exception to client.But if the operation is executed again, we don`t need to return this RetryException to client. (cherry picked from commit 0f1913250d962f1763ebb586711a8d9be5ba5d16) --- .../org/apache/pulsar/broker/PulsarService.java | 1 + .../broker/TransactionMetadataStoreService.java | 21 +++++++---- .../org/apache/pulsar/broker/service/Topic.java | 5 +++ .../recover/TransactionRecoverTrackerImpl.java | 6 ++-- .../TransactionMetadataStoreServiceTest.java | 14 +++++--- .../broker/stats/TransactionMetricsTest.java | 6 ++-- .../pulsar/broker/transaction/TransactionTest.java | 41 ++++++++++++++++++++++ 7 files changed, 79 insertions(+), 15 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 79aba81..adeec4c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -127,6 +127,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.transaction.TransactionBufferClient; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.common.conf.InternalConfigurationData; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.configuration.VipStatus; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java index 3f3e8d8..9b60679 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java @@ -321,6 +321,11 @@ public class TransactionMetadataStoreService { public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction, boolean isTimeout) { CompletableFuture<Void> completableFuture = new CompletableFuture<>(); + return endTransaction(txnID, txnAction, isTimeout, completableFuture); + } + + public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction, boolean isTimeout, + CompletableFuture<Void> completableFuture) { TxnStatus newStatus; switch (txnAction) { case TxnAction.COMMIT_VALUE: @@ -352,8 +357,9 @@ public class TransactionMetadataStoreService { + "TxnAction : {}", txnID, txnAction, e); } transactionOpRetryTimer.newTimeout(timeout -> - endTransaction(txnID, txnAction, isTimeout), + endTransaction(txnID, txnAction, isTimeout, completableFuture), endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS); + return null; } completableFuture.completeExceptionally(e.getCause()); @@ -367,8 +373,9 @@ public class TransactionMetadataStoreService { LOG.debug("EndTransaction UpdateTxnStatus op retry! TxnId : {}, " + "TxnAction : {}", txnID, txnAction, e); } - transactionOpRetryTimer.newTimeout(timeout -> endTransaction(txnID, txnAction, isTimeout), - endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS); + transactionOpRetryTimer.newTimeout(timeout -> endTransaction(txnID, txnAction, + isTimeout, completableFuture), endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS); + return null; } completableFuture.completeExceptionally(e.getCause()); @@ -385,8 +392,9 @@ public class TransactionMetadataStoreService { + "TxnAction : {}", txnID, txnAction, e); } transactionOpRetryTimer.newTimeout(timeout -> - endTransaction(txnID, txnAction, isTimeout), + endTransaction(txnID, txnAction, isTimeout, completableFuture), endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS); + return null; } else { LOG.error("EndTxnInTransactionBuffer fail! TxnId : {}, " + "TxnAction : {}", txnID, txnAction, e); @@ -406,8 +414,9 @@ public class TransactionMetadataStoreService { if (LOG.isDebugEnabled()) { LOG.debug("End transaction op retry! TxnId : {}, TxnAction : {}", txnID, txnAction, e); } - transactionOpRetryTimer.newTimeout(timeout -> endTransaction(txnID, txnAction, isTimeout), - endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS); + transactionOpRetryTimer.newTimeout(timeout -> endTransaction(txnID, txnAction, isTimeout, + completableFuture), endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS); + return null; } completableFuture.completeExceptionally(e.getCause()); return null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 0e7589a..9dd945f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -288,4 +288,9 @@ public interface Topic { */ CompletableFuture<Void> truncate(); + /** + * Get BrokerService. + * @return + */ + BrokerService getBrokerService(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerImpl.java index 3667e66..05b61fd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerImpl.java @@ -123,9 +123,11 @@ public class TransactionRecoverTrackerImpl implements TransactionRecoverTracker @Override public void handleCommittingAndAbortingTransaction() { committingTransactions.forEach(k -> - transactionMetadataStoreService.endTransaction(new TxnID(tcId, k), TxnAction.COMMIT_VALUE, false)); + transactionMetadataStoreService.endTransaction(new TxnID(tcId, k), TxnAction.COMMIT_VALUE, + false)); abortingTransactions.forEach(k -> - transactionMetadataStoreService.endTransaction(new TxnID(tcId, k), TxnAction.ABORT_VALUE, false)); + transactionMetadataStoreService.endTransaction(new TxnID(tcId, k), TxnAction.ABORT_VALUE, + false)); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java index 983f0d1..e50a39c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java @@ -25,11 +25,13 @@ import java.lang.reflect.Field; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import com.google.common.collect.Sets; +import java.util.concurrent.TimeoutException; import org.apache.bookkeeper.mledger.Position; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.ServiceConfiguration; @@ -351,13 +353,15 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase { Field field = TransactionMetadataStoreState.class.getDeclaredField("state"); field.setAccessible(true); field.set(transactionMetadataStore, TransactionMetadataStoreState.State.None); - + CompletableFuture<Void> completableFuture = null; try { - pulsar.getTransactionMetadataStoreService().endTransaction(txnID, TxnAction.COMMIT.getValue(), false).get(); + completableFuture = pulsar.getTransactionMetadataStoreService().endTransaction(txnID, TxnAction.COMMIT.getValue(), + false); + completableFuture.get(5, TimeUnit.SECONDS); fail(); } catch (Exception e) { if (txnStatus == TxnStatus.OPEN || txnStatus == TxnStatus.COMMITTING) { - assertTrue(e.getCause() instanceof CoordinatorException.TransactionMetadataStoreStateException); + assertTrue(e instanceof TimeoutException); } else if (txnStatus == TxnStatus.ABORTING) { assertTrue(e.getCause() instanceof CoordinatorException.InvalidTxnStatusException); } else { @@ -370,9 +374,9 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase { field = TransactionMetadataStoreState.class.getDeclaredField("state"); field.setAccessible(true); field.set(transactionMetadataStore, TransactionMetadataStoreState.State.Ready); - if (txnStatus == TxnStatus.ABORTING) { - pulsar.getTransactionMetadataStoreService().endTransaction(txnID, TxnAction.ABORT.getValue(), false).get(); + pulsar.getTransactionMetadataStoreService() + .endTransaction(txnID, TxnAction.ABORT.getValue(), false).get(); } Awaitility.await().atMost(timeOut, TimeUnit.MILLISECONDS).until(() -> { try { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java index 7c48e81..0c9f877 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java @@ -163,9 +163,11 @@ public class TransactionMetricsTest extends BrokerTestBase { for (int i = 0; i < txnCount; i++) { if (i % 2 == 0) { - pulsar.getTransactionMetadataStoreService().endTransaction(list.get(i), TxnAction.COMMIT_VALUE, false).get(); + pulsar.getTransactionMetadataStoreService().endTransaction(list.get(i), TxnAction.COMMIT_VALUE, + false).get(); } else { - pulsar.getTransactionMetadataStoreService().endTransaction(list.get(i), TxnAction.ABORT_VALUE, false).get(); + pulsar.getTransactionMetadataStoreService().endTransaction(list.get(i), TxnAction.ABORT_VALUE, + false).get(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index d7d9089..4aaadb2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -45,6 +45,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.AsyncCallbacks; @@ -93,6 +94,7 @@ import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState; import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker; import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker; import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl; @@ -733,4 +735,43 @@ public class TransactionTest extends TransactionTestBase { TopicTransactionBuffer transactionBuffer = new TopicTransactionBuffer(persistentTopic); Awaitility.await().untilAsserted(() -> Assert.assertTrue(transactionBuffer.checkIfReady())); } + + @Test + public void testRetryExceptionOfEndTxn() throws Exception{ + Transaction transaction = pulsarClient.newTransaction() + .withTransactionTimeout(10, TimeUnit.SECONDS) + .build() + .get(); + Class<TransactionMetadataStoreState> transactionMetadataStoreStateClass = TransactionMetadataStoreState.class; + getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores() + .values() + .forEach((transactionMetadataStore -> { + try { + Field field = transactionMetadataStoreStateClass.getDeclaredField("state"); + field.setAccessible(true); + field.set(transactionMetadataStore, TransactionMetadataStoreState.State.Initializing); + } catch (Exception e) { + e.printStackTrace(); + } + })); + CompletableFuture<Void> completableFuture = transaction.commit(); + try { + completableFuture.get(5, TimeUnit.SECONDS); + fail(); + } catch (TimeoutException ignored) { + } + getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores() + .values() + .stream() + .forEach((transactionMetadataStore -> { + try { + Field field = transactionMetadataStoreStateClass.getDeclaredField("state"); + field.setAccessible(true); + field.set(transactionMetadataStore, TransactionMetadataStoreState.State.Ready); + } catch (Exception e) { + e.printStackTrace(); + } + })); + completableFuture.get(5, TimeUnit.SECONDS); + } }
