This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b244d0c3bc625bb4d4d3108735fd2a219d328a50 Author: Xiangying Meng <[email protected]> AuthorDate: Sat Dec 18 08:35:18 2021 +0800 [Transaction]Txn client check timeout (#12521) ### Motivation Optimize the logic on the Transaction Client side. Avoid sending and acking messages with timeout transactions. ### Modifications * TransactionImp * Add a tool field for CAS to replace State : STATE_UPDATE. **When committing and aborted, only the successful cas operation will make subsequent judgments, otherwise it will return a failure future** * Implement TimerTasker to execute tasks that replace the state of the transaction as Aborted. * TransactionBuildImpl * In the callback of the build method, call the timer of PulsarClient to start a Timeout. Pass in the corresponding transactionImpl (TimeTasker has been implemented) (cherry picked from commit c5d7a84c8e5c27e48022df8c7082496840cd3be9) --- .../client/impl/ConsumerAckResponseTest.java | 2 + .../client/impl/TransactionEndToEndTest.java | 67 +++++++++++++++++++++- .../TransactionCoordinatorClientException.java | 20 +++++++ .../apache/pulsar/client/impl/ConsumerBase.java | 4 ++ .../client/impl/PartitionedProducerImpl.java | 5 ++ .../apache/pulsar/client/impl/ProducerImpl.java | 4 ++ .../pulsar/client/impl/PulsarClientImpl.java | 1 + .../impl/transaction/TransactionBuilderImpl.java | 9 ++- .../client/impl/transaction/TransactionImpl.java | 38 +++++++++--- 9 files changed, 137 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java index 0378c53..6981865 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java @@ -50,8 +50,10 @@ public class ConsumerAckResponseTest extends ProducerConsumerBase { super.producerBaseSetup(); doReturn(1L).when(transaction).getTxnIdLeastBits(); doReturn(1L).when(transaction).getTxnIdMostBits(); + doReturn(TransactionImpl.State.OPEN).when(transaction).getState(); CompletableFuture<Void> completableFuture = CompletableFuture.completedFuture(null); doNothing().when(transaction).registerAckOp(any()); + doReturn(true).when(transaction).checkIfOpen(any()); doReturn(completableFuture).when(transaction).registerAckedTopic(any(), any()); Thread.sleep(1000 * 3); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java index 4630449..f52d319 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java @@ -23,7 +23,9 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import java.lang.reflect.Constructor; import java.lang.reflect.Field; +import java.util.Collection; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.ArrayList; @@ -770,19 +772,46 @@ public class TransactionEndToEndTest extends TransactionTestBase { } }); + Class<TransactionImpl> transactionClass = TransactionImpl.class; + Constructor<TransactionImpl> constructor = transactionClass + .getDeclaredConstructor(PulsarClientImpl.class, long.class, long.class, long.class); + constructor.setAccessible(true); + + TransactionImpl timeoutTxnSkipClientTimeout = constructor.newInstance(pulsarClient, 5, + timeoutTxn.getTxnID().getLeastSigBits(), timeoutTxn.getTxnID().getMostSigBits()); + try { - timeoutTxn.commit().get(); + timeoutTxnSkipClientTimeout.commit().get(); fail(); } catch (Exception e) { assertTrue(e.getCause() instanceof TransactionNotFoundException); } Field field = TransactionImpl.class.getDeclaredField("state"); field.setAccessible(true); - TransactionImpl.State state = (TransactionImpl.State) field.get(timeoutTxn); + TransactionImpl.State state = (TransactionImpl.State) field.get(timeoutTxnSkipClientTimeout); assertEquals(state, TransactionImpl.State.ERROR); } @Test + public void testTxnTimeoutAtTransactionMetadataStore() throws Exception{ + TxnID txnID = pulsarServiceList.get(0).getTransactionMetadataStoreService() + .newTransaction(new TransactionCoordinatorID(0), 1).get(); + Awaitility.await().until(() -> { + try { + getPulsarServiceList().get(0).getTransactionMetadataStoreService().getTxnMeta(txnID).get(); + return false; + } catch (Exception e) { + return true; + } + }); + Collection<TransactionMetadataStore> transactionMetadataStores = + getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores().values(); + long timeoutCount = transactionMetadataStores.stream() + .mapToLong(store -> store.getMetadataStoreStats().timeoutCount).sum(); + Assert.assertEquals(timeoutCount, 1); + } + + @Test public void transactionTimeoutTest() throws Exception { String topic = NAMESPACE1 + "/txn-timeout"; @@ -943,4 +972,38 @@ public class TransactionEndToEndTest extends TransactionTestBase { } assertTrue(flag); } + + @Test + public void testTxnTimeOutInClient() throws Exception{ + String topic = NAMESPACE1 + "/testTxnTimeOutInClient"; + @Cleanup + Producer<String> producer = pulsarClient.newProducer(Schema.STRING).producerName("testTxnTimeOut_producer") + .topic(topic).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create(); + @Cleanup + Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).consumerName("testTxnTimeOut_consumer") + .topic(topic).subscriptionName("testTxnTimeOut_sub").subscribe(); + + Transaction transaction = pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.SECONDS) + .build().get(); + producer.newMessage().send(); + Awaitility.await().untilAsserted(() -> { + Assert.assertEquals(((TransactionImpl)transaction).getState(), TransactionImpl.State.TIMEOUT); + }); + + try { + producer.newMessage(transaction).send(); + Assert.fail(); + } catch (Exception e) { + Assert.assertTrue(e.getCause().getCause() instanceof TransactionCoordinatorClientException + .InvalidTxnStatusException); + } + try { + Message<String> message = consumer.receive(); + consumer.acknowledgeAsync(message.getMessageId(), transaction).get(); + Assert.fail(); + } catch (Exception e) { + Assert.assertTrue(e.getCause() instanceof TransactionCoordinatorClientException + .InvalidTxnStatusException); + } + } } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java index 0e1f6c7..d7df4e3 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java @@ -68,6 +68,11 @@ public class TransactionCoordinatorClientException extends IOException { public InvalidTxnStatusException(String message) { super(message); } + + public InvalidTxnStatusException(String txnId, String actualState, String expectState) { + super("["+ txnId +"] with unexpected state : " + + actualState + ", expect " + expectState + " state!"); + } } /** @@ -93,6 +98,21 @@ public class TransactionCoordinatorClientException extends IOException { } } + + /** + * Thrown when transaction meta was timeout. + */ + public static class TransactionTimeotException extends TransactionCoordinatorClientException { + + public TransactionTimeotException(Throwable t) { + super(t); + } + + public TransactionTimeotException(String transactionId) { + super("The transaction " + transactionId + " is timeout."); + } + } + /** * Thrown when send request to transaction meta store but the transaction meta store handler not ready. */ diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 1251593..0f47208 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -487,6 +487,10 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T if (null != txn) { checkArgument(txn instanceof TransactionImpl); txnImpl = (TransactionImpl) txn; + CompletableFuture<Void> completableFuture = new CompletableFuture<>(); + if (!txnImpl.checkIfOpen(completableFuture)) { + return completableFuture; + } } return doAcknowledgeWithTxn(messageId, AckType.Individual, Collections.emptyMap(), txnImpl); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java index 3311050..216d775 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java @@ -48,6 +48,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.TopicMetadata; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; +import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; @@ -191,6 +192,10 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> { @Override CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message, Transaction txn) { + CompletableFuture<MessageId> completableFuture = new CompletableFuture<>(); + if (txn != null && !((TransactionImpl)txn).checkIfOpen(completableFuture)) { + return completableFuture; + } int partition = routerPolicy.choosePartition(message, topicMetadata); checkArgument(partition >= 0 && partition < topicMetadata.numPartitions(), "Illegal partition index chosen by the message routing policy: " + partition); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index a3ca386..5944c8f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -371,6 +371,10 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne if (txn == null) { return internalSendAsync(message); } else { + CompletableFuture<MessageId> completableFuture = new CompletableFuture<>(); + if (!((TransactionImpl)txn).checkIfOpen(completableFuture)) { + return completableFuture; + } return ((TransactionImpl) txn).registerProducedTopic(topic) .thenCompose(ignored -> internalSendAsync(message)); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 7703afc..c5195c9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -95,6 +95,7 @@ public class PulsarClientImpl implements PulsarClient { protected final ClientConfigurationData conf; private LookupService lookup; private final ConnectionPool cnxPool; + @Getter private final Timer timer; private boolean needStopTimer; private final ExecutorProvider externalExecutorProvider; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java index 84be46f..3ac8676 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.client.impl.transaction; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; @@ -67,8 +69,11 @@ public class TransactionBuilderImpl implements TransactionBuilder { future.completeExceptionally(throwable); return; } - future.complete(new TransactionImpl(client, txnTimeout, - txnID.getLeastSigBits(), txnID.getMostSigBits())); + TransactionImpl transaction = new TransactionImpl(client, txnTimeout, + txnID.getLeastSigBits(), txnID.getMostSigBits()); + client.getTimer().newTimeout(transaction, + txnTimeout, timeUnit); + future.complete(transaction); }); return future; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java index 458976a..4128a6f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.client.impl.transaction; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -25,6 +27,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import com.google.common.collect.Lists; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; @@ -48,7 +51,7 @@ import org.apache.pulsar.common.util.FutureUtil; */ @Slf4j @Getter -public class TransactionImpl implements Transaction { +public class TransactionImpl implements Transaction , TimerTask { private final PulsarClientImpl client; private final long transactionTimeoutMs; @@ -63,6 +66,13 @@ public class TransactionImpl implements Transaction { private final ArrayList<CompletableFuture<MessageId>> sendFutureList; private final ArrayList<CompletableFuture<Void>> ackFutureList; private volatile State state; + private final AtomicReferenceFieldUpdater<TransactionImpl, State> STATE_UPDATE = + AtomicReferenceFieldUpdater.newUpdater(TransactionImpl.class, State.class, "state"); + + @Override + public void run(Timeout timeout) throws Exception { + STATE_UPDATE.compareAndSet(this, State.OPEN, State.TIMEOUT); + } public enum State { OPEN, @@ -70,7 +80,8 @@ public class TransactionImpl implements Transaction { ABORTING, COMMITTED, ABORTED, - ERROR + ERROR, + TIMEOUT } TransactionImpl(PulsarClientImpl client, @@ -93,7 +104,8 @@ public class TransactionImpl implements Transaction { // register the topics that will be modified by this transaction public CompletableFuture<Void> registerProducedTopic(String topic) { - return checkIfOpen().thenCompose(value -> { + CompletableFuture<Void> completableFuture = new CompletableFuture<>(); + if (checkIfOpen(completableFuture)) { synchronized (TransactionImpl.this) { // we need to issue the request to TC to register the produced topic return registerPartitionMap.compute(topic, (key, future) -> { @@ -106,7 +118,9 @@ public class TransactionImpl implements Transaction { } }); } - }); + } else { + return completableFuture; + } } public synchronized void registerSendOp(CompletableFuture<MessageId> sendFuture) { @@ -115,7 +129,8 @@ public class TransactionImpl implements Transaction { // register the topics that will be modified by this transaction public CompletableFuture<Void> registerAckedTopic(String topic, String subscription) { - return checkIfOpen().thenCompose(value -> { + CompletableFuture<Void> completableFuture = new CompletableFuture<>(); + if (checkIfOpen(completableFuture)) { synchronized (TransactionImpl.this) { // we need to issue the request to TC to register the acked topic return registerSubscriptionMap.compute(Pair.of(topic, subscription), (key, future) -> { @@ -128,7 +143,9 @@ public class TransactionImpl implements Transaction { } }); } - }); + } else { + return completableFuture; + } } public synchronized void registerAckOp(CompletableFuture<Void> ackFuture) { @@ -213,11 +230,14 @@ public class TransactionImpl implements Transaction { return new TxnID(txnIdMostBits, txnIdLeastBits); } - private CompletableFuture<Void> checkIfOpen() { + public <T> boolean checkIfOpen(CompletableFuture<T> completableFuture) { if (state == State.OPEN) { - return CompletableFuture.completedFuture(null); + return true; } else { - return invalidTxnStatusFuture(); + completableFuture + .completeExceptionally(new InvalidTxnStatusException( + new TxnID(txnIdMostBits, txnIdLeastBits).toString(), state.name(), State.OPEN.name())); + return false; } }
