This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new f09ec606599 [fix][txn] optimize the ack/send future in TransactionImpl
(#20271)
f09ec606599 is described below
commit f09ec6065992064ab2665cf024c377d9f6b2fcf0
Author: Jiwei Guo <[email protected]>
AuthorDate: Tue May 9 20:31:06 2023 +0800
[fix][txn] optimize the ack/send future in TransactionImpl (#20271)
---
.../java/org/apache/pulsar/RoaringbitmapTest.java | 2 +-
.../pulsar/broker/transaction/TransactionTest.java | 82 ++++++++++++++++++
.../pulsar/client/api/PulsarClientException.java | 24 +++++-
.../pulsar/client/api/transaction/TxnID.java | 23 ++++-
.../client/impl/transaction/TransactionImpl.java | 99 +++++++++++++---------
5 files changed, 183 insertions(+), 47 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/RoaringbitmapTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/RoaringbitmapTest.java
index 477e7413ef9..0f606e22007 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/RoaringbitmapTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/RoaringbitmapTest.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 5aafbb25060..740854a9b9e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -51,8 +51,10 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -318,6 +320,86 @@ public class TransactionTest extends TransactionTestBase {
.subscribe();
}
+ @Test
+ public void testAsyncSendOrAckForSingleFuture() throws Exception {
+ String topic = NAMESPACE1 + "/testSingleFuture";
+ int totalMessage = 10;
+ int threadSize = 30;
+ String topicName = "subscription";
+
getPulsarServiceList().get(0).getConfig().setBrokerDeduplicationEnabled(false);
+ ExecutorService executorService =
Executors.newFixedThreadPool(threadSize);
+
+ //build producer/consumer
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .producerName("producer")
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .create();
+
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscriptionName(topicName)
+ .subscribe();
+ //store the send/ack result futures
+ CopyOnWriteArrayList<CompletableFuture<MessageId>> sendFutures = new
CopyOnWriteArrayList<>();
+ CopyOnWriteArrayList<CompletableFuture<Void>> ackFutures = new
CopyOnWriteArrayList<>();
+
+ //send and ack messages with transaction
+ Transaction transaction1 = pulsarClient.newTransaction()
+ .withTransactionTimeout(10, TimeUnit.SECONDS)
+ .build()
+ .get();
+
+ for (int i = 0; i < totalMessage * threadSize; i++) {
+ producer.newMessage().send();
+ }
+
+ CountDownLatch countDownLatch = new CountDownLatch(threadSize);
+ for (int i = 0; i < threadSize; i++) {
+ executorService.submit(() -> {
+ try {
+ for (int j = 0; j < totalMessage; j++) {
+ CompletableFuture<MessageId> sendFuture =
producer.newMessage(transaction1).sendAsync();
+ sendFutures.add(sendFuture);
+ Message<byte[]> message = consumer.receive();
+ CompletableFuture<Void> ackFuture =
consumer.acknowledgeAsync(message.getMessageId(),
+ transaction1);
+ ackFutures.add(ackFuture);
+ }
+ countDownLatch.countDown();
+ } catch (Exception e) {
+ log.error("Failed to send/ack messages with transaction.",
e);
+ countDownLatch.countDown();
+ }
+ });
+ }
+ //wait the all send/ack op is executed and store its futures in the
arraylist.
+ countDownLatch.await(10, TimeUnit.SECONDS);
+ transaction1.commit().get();
+
+ //verify the final status is right.
+ Field ackCountField =
TransactionImpl.class.getDeclaredField("opCount");
+ ackCountField.setAccessible(true);
+ long ackCount = (long) ackCountField.get(transaction1);
+ Assert.assertEquals(ackCount, 0L);
+
+ for (int i = 0; i < totalMessage * threadSize; i++) {
+ Assert.assertTrue(sendFutures.get(i).isDone());
+ Assert.assertTrue(ackFutures.get(i).isDone());
+ }
+
+ //verify opFuture without any operation.
+ Transaction transaction2 = pulsarClient.newTransaction()
+ .withTransactionTimeout(10, TimeUnit.SECONDS)
+ .build()
+ .get();
+ Awaitility.await().until(() -> {
+ transaction2.commit().get();
+ return true;
+ });
+ }
+
@Test
public void testGetTxnID() throws Exception {
Transaction transaction = pulsarClient.newTransaction()
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index b207387f963..c68c575ec4f 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -910,6 +910,23 @@ public class PulsarClientException extends IOException {
}
}
+ public static class TransactionHasOperationFailedException extends
PulsarClientException {
+ /**
+ * Constructs an {@code TransactionHasOperationFailedException}.
+ */
+ public TransactionHasOperationFailedException() {
+ super("Now allowed to commit the transaction due to failed
operations of producing or acknowledgment");
+ }
+
+ /**
+ * Constructs an {@code TransactionHasOperationFailedException} with
the specified detail message.
+ * @param msg The detail message.
+ */
+ public TransactionHasOperationFailedException(String msg) {
+ super(msg);
+ }
+ }
+
// wrap an exception to enriching more info messages.
public static Throwable wrap(Throwable t, String msg) {
msg += "\n" + t.getMessage();
@@ -972,6 +989,8 @@ public class PulsarClientException extends IOException {
return new MessageAcknowledgeException(msg);
} else if (t instanceof TransactionConflictException) {
return new TransactionConflictException(msg);
+ } else if (t instanceof TransactionHasOperationFailedException) {
+ return new TransactionHasOperationFailedException(msg);
} else if (t instanceof PulsarClientException) {
return new PulsarClientException(msg);
} else if (t instanceof CompletionException) {
@@ -1070,6 +1089,8 @@ public class PulsarClientException extends IOException {
newException = new MemoryBufferIsFullError(msg);
} else if (cause instanceof NotFoundException) {
newException = new NotFoundException(msg);
+ } else if (cause instanceof TransactionHasOperationFailedException) {
+ newException = new TransactionHasOperationFailedException(msg);
} else {
newException = new PulsarClientException(t);
}
@@ -1133,7 +1154,8 @@ public class PulsarClientException extends IOException {
|| t instanceof MessageAcknowledgeException
|| t instanceof TransactionConflictException
|| t instanceof ProducerBusyException
- || t instanceof ConsumerBusyException) {
+ || t instanceof ConsumerBusyException
+ || t instanceof TransactionHasOperationFailedException) {
return false;
}
return true;
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java
index d503abeafc2..d48039a6f33 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java
@@ -20,7 +20,9 @@ package org.apache.pulsar.client.api.transaction;
import java.io.Serializable;
import java.util.Objects;
+import lombok.AccessLevel;
import lombok.Data;
+import lombok.Getter;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
@@ -48,22 +50,35 @@ public class TxnID implements Serializable {
*/
private final long leastSigBits;
+ @Getter(AccessLevel.NONE)
+ private final transient int hashCode;
+
+ @Getter(AccessLevel.NONE)
+ private final transient String txnStr;
+
+ public TxnID(long mostSigBits, long leastSigBits) {
+ this.mostSigBits = mostSigBits;
+ this.leastSigBits = leastSigBits;
+ this.hashCode = Objects.hash(mostSigBits, leastSigBits);
+ this.txnStr = "(" + mostSigBits + "," + leastSigBits + ")";
+ }
+
@Override
public String toString() {
- return "(" + mostSigBits + "," + leastSigBits + ")";
+ return txnStr;
}
@Override
public int hashCode() {
- return Objects.hash(mostSigBits, leastSigBits);
+ return hashCode;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof TxnID) {
TxnID other = (TxnID) obj;
- return Objects.equals(mostSigBits, other.mostSigBits)
- && Objects.equals(leastSigBits, other.leastSigBits);
+ return mostSigBits == other.mostSigBits
+ && leastSigBits == other.leastSigBits;
}
return false;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index 68a10fed2d8..420bdfce478 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -21,18 +21,18 @@ package org.apache.pulsar.client.impl.transaction;
import com.google.common.collect.Lists;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.Transaction;
import
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.InvalidTxnStatusException;
import
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.TransactionNotFoundException;
@@ -58,15 +58,24 @@ public class TransactionImpl implements Transaction ,
TimerTask {
private final long txnIdLeastBits;
private final long txnIdMostBits;
+ private final TxnID txnId;
+
private final Map<String, CompletableFuture<Void>> registerPartitionMap;
private final Map<Pair<String, String>, CompletableFuture<Void>>
registerSubscriptionMap;
private final TransactionCoordinatorClientImpl tcClient;
- private final ArrayList<CompletableFuture<MessageId>> sendFutureList;
- private final ArrayList<CompletableFuture<Void>> ackFutureList;
+ private CompletableFuture<Void> opFuture;
+
+ private volatile long opCount = 0L;
+ private static final AtomicLongFieldUpdater<TransactionImpl>
OP_COUNT_UPDATE =
+ AtomicLongFieldUpdater.newUpdater(TransactionImpl.class,
"opCount");
+
+
private volatile State state;
private static final AtomicReferenceFieldUpdater<TransactionImpl, State>
STATE_UPDATE =
AtomicReferenceFieldUpdater.newUpdater(TransactionImpl.class,
State.class, "state");
+
+ private volatile boolean hasOpsFailed = false;
private final Timeout timeout;
@Override
@@ -83,13 +92,12 @@ public class TransactionImpl implements Transaction ,
TimerTask {
this.transactionTimeoutMs = transactionTimeoutMs;
this.txnIdLeastBits = txnIdLeastBits;
this.txnIdMostBits = txnIdMostBits;
+ this.txnId = new TxnID(this.txnIdMostBits, this.txnIdLeastBits);
this.registerPartitionMap = new ConcurrentHashMap<>();
this.registerSubscriptionMap = new ConcurrentHashMap<>();
this.tcClient = client.getTcClient();
-
- this.sendFutureList = new ArrayList<>();
- this.ackFutureList = new ArrayList<>();
+ this.opFuture = CompletableFuture.completedFuture(null);
this.timeout = client.getTimer().newTimeout(this,
transactionTimeoutMs, TimeUnit.MILLISECONDS);
}
@@ -105,7 +113,7 @@ public class TransactionImpl implements Transaction ,
TimerTask {
return future.thenCompose(ignored ->
CompletableFuture.completedFuture(null));
} else {
return tcClient.addPublishPartitionToTxnAsync(
- new TxnID(txnIdMostBits, txnIdLeastBits),
Lists.newArrayList(topic))
+ txnId, Lists.newArrayList(topic))
.thenCompose(ignored ->
CompletableFuture.completedFuture(null));
}
});
@@ -114,8 +122,25 @@ public class TransactionImpl implements Transaction ,
TimerTask {
return completableFuture;
}
- public synchronized void registerSendOp(CompletableFuture<MessageId>
sendFuture) {
- sendFutureList.add(sendFuture);
+ public void registerSendOp(CompletableFuture<MessageId> newSendFuture) {
+ if (OP_COUNT_UPDATE.getAndIncrement(this) == 0) {
+ opFuture = new CompletableFuture<>();
+ }
+ // the opCount is always bigger than 0 if there is an exception,
+ // and then the opFuture will never be replaced.
+ newSendFuture.whenComplete((messageId, e) -> {
+ if (e != null) {
+ log.error("The transaction [{}:{}] get an exception when send
messages.",
+ txnIdMostBits, txnIdLeastBits, e);
+ if (!hasOpsFailed) {
+ hasOpsFailed = true;
+ }
+ }
+ CompletableFuture<Void> future = opFuture;
+ if (OP_COUNT_UPDATE.decrementAndGet(this) == 0) {
+ future.complete(null);
+ }
+ });
}
// register the topics that will be modified by this transaction
@@ -129,7 +154,7 @@ public class TransactionImpl implements Transaction ,
TimerTask {
return future.thenCompose(ignored ->
CompletableFuture.completedFuture(null));
} else {
return tcClient.addSubscriptionToTxnAsync(
- new TxnID(txnIdMostBits, txnIdLeastBits),
topic, subscription)
+ txnId, topic, subscription)
.thenCompose(ignored ->
CompletableFuture.completedFuture(null));
}
});
@@ -138,8 +163,25 @@ public class TransactionImpl implements Transaction ,
TimerTask {
return completableFuture;
}
- public synchronized void registerAckOp(CompletableFuture<Void> ackFuture) {
- ackFutureList.add(ackFuture);
+ public void registerAckOp(CompletableFuture<Void> newAckFuture) {
+ if (OP_COUNT_UPDATE.getAndIncrement(this) == 0) {
+ opFuture = new CompletableFuture<>();
+ }
+ // the opCount is always bigger than 0 if there is an exception,
+ // and then the opFuture will never be replaced.
+ newAckFuture.whenComplete((ignore, e) -> {
+ if (e != null) {
+ log.error("The transaction [{}:{}] get an exception when ack
messages.",
+ txnIdMostBits, txnIdLeastBits, e);
+ if (!hasOpsFailed) {
+ hasOpsFailed = true;
+ }
+ }
+ CompletableFuture<Void> future = opFuture;
+ if (OP_COUNT_UPDATE.decrementAndGet(this) == 0) {
+ future.complete(null);
+ }
+ });
}
@Override
@@ -148,19 +190,13 @@ public class TransactionImpl implements Transaction ,
TimerTask {
return checkState(State.OPEN, State.COMMITTING).thenCompose((value) ->
{
CompletableFuture<Void> commitFuture = new CompletableFuture<>();
this.state = State.COMMITTING;
-<<<<<<< HEAD
- allOpComplete().whenComplete((v, e) -> {
- if (e != null) {
- abort().whenComplete((vx, ex) ->
commitFuture.completeExceptionally(e));
-=======
opFuture.whenComplete((v, e) -> {
if (hasOpsFailed) {
checkState(State.COMMITTING).thenCompose(__ ->
internalAbort()).whenComplete((vx, ex) ->
commitFuture.completeExceptionally(
new
PulsarClientException.TransactionHasOperationFailedException()));
->>>>>>> 00d09cbbd2b ([fix][txn] Fix transaction is not aborted when send or
ACK failed (#20055))
} else {
- tcClient.commitAsync(new TxnID(txnIdMostBits,
txnIdLeastBits))
+ tcClient.commitAsync(txnId)
.whenComplete((vx, ex) -> {
if (ex != null) {
if (ex instanceof
TransactionNotFoundException
@@ -182,19 +218,8 @@ public class TransactionImpl implements Transaction ,
TimerTask {
@Override
public CompletableFuture<Void> abort() {
timeout.cancel();
-<<<<<<< HEAD
- return checkIfOpenOrAborting().thenCompose(value -> {
- CompletableFuture<Void> abortFuture = new CompletableFuture<>();
- this.state = State.ABORTING;
- allOpComplete().whenComplete((v, e) -> {
- if (e != null) {
- log.error(e.getMessage());
- }
- tcClient.abortAsync(new TxnID(txnIdMostBits,
txnIdLeastBits)).whenComplete((vx, ex) -> {
-=======
return checkState(State.OPEN, State.ABORTING).thenCompose(__ ->
internalAbort());
}
->>>>>>> 00d09cbbd2b ([fix][txn] Fix transaction is not aborted when send or
ACK failed (#20055))
private CompletableFuture<Void> internalAbort() {
CompletableFuture<Void> abortFuture = new CompletableFuture<>();
@@ -221,7 +246,7 @@ public class TransactionImpl implements Transaction ,
TimerTask {
@Override
public TxnID getTxnID() {
- return new TxnID(txnIdMostBits, txnIdLeastBits);
+ return this.txnId;
}
@Override
@@ -235,7 +260,7 @@ public class TransactionImpl implements Transaction ,
TimerTask {
} else {
completableFuture
.completeExceptionally(new InvalidTxnStatusException(
- new TxnID(txnIdMostBits,
txnIdLeastBits).toString(), state.name(), State.OPEN.name()));
+ txnId.toString(), state.name(),
State.OPEN.name()));
return false;
}
}
@@ -251,12 +276,4 @@ public class TransactionImpl implements Transaction ,
TimerTask {
+ txnIdLeastBits + "] with unexpected state: " +
actualState.name() + ", expect: "
+ Arrays.toString(expectedStates)));
}
-
-
- private CompletableFuture<Void> allOpComplete() {
- List<CompletableFuture<?>> futureList = new ArrayList<>();
- futureList.addAll(sendFutureList);
- futureList.addAll(ackFutureList);
- return CompletableFuture.allOf(futureList.toArray(new
CompletableFuture[0]));
- }
}