This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new f09ec606599 [fix][txn] optimize the ack/send future in TransactionImpl 
(#20271)
f09ec606599 is described below

commit f09ec6065992064ab2665cf024c377d9f6b2fcf0
Author: Jiwei Guo <[email protected]>
AuthorDate: Tue May 9 20:31:06 2023 +0800

    [fix][txn] optimize the ack/send future in TransactionImpl (#20271)
---
 .../java/org/apache/pulsar/RoaringbitmapTest.java  |  2 +-
 .../pulsar/broker/transaction/TransactionTest.java | 82 ++++++++++++++++++
 .../pulsar/client/api/PulsarClientException.java   | 24 +++++-
 .../pulsar/client/api/transaction/TxnID.java       | 23 ++++-
 .../client/impl/transaction/TransactionImpl.java   | 99 +++++++++++++---------
 5 files changed, 183 insertions(+), 47 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/RoaringbitmapTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/RoaringbitmapTest.java
index 477e7413ef9..0f606e22007 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/RoaringbitmapTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/RoaringbitmapTest.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 5aafbb25060..740854a9b9e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -51,8 +51,10 @@ import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -318,6 +320,86 @@ public class TransactionTest extends TransactionTestBase {
                 .subscribe();
     }
 
+    @Test
+    public void testAsyncSendOrAckForSingleFuture() throws Exception {
+        String topic = NAMESPACE1 + "/testSingleFuture";
+        int totalMessage = 10;
+        int threadSize = 30;
+        String topicName = "subscription";
+        
getPulsarServiceList().get(0).getConfig().setBrokerDeduplicationEnabled(false);
+        ExecutorService executorService = 
Executors.newFixedThreadPool(threadSize);
+
+        //build producer/consumer
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .producerName("producer")
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName(topicName)
+                .subscribe();
+        //store the send/ack result  futures
+        CopyOnWriteArrayList<CompletableFuture<MessageId>> sendFutures = new 
CopyOnWriteArrayList<>();
+        CopyOnWriteArrayList<CompletableFuture<Void>> ackFutures = new 
CopyOnWriteArrayList<>();
+
+        //send and ack messages with transaction
+        Transaction transaction1 = pulsarClient.newTransaction()
+                .withTransactionTimeout(10, TimeUnit.SECONDS)
+                .build()
+                .get();
+
+        for (int i = 0; i < totalMessage * threadSize; i++) {
+            producer.newMessage().send();
+        }
+
+        CountDownLatch countDownLatch = new CountDownLatch(threadSize);
+        for (int i = 0; i < threadSize; i++) {
+            executorService.submit(() -> {
+                try {
+                    for (int j = 0; j < totalMessage; j++) {
+                        CompletableFuture<MessageId> sendFuture = 
producer.newMessage(transaction1).sendAsync();
+                        sendFutures.add(sendFuture);
+                        Message<byte[]> message = consumer.receive();
+                        CompletableFuture<Void> ackFuture = 
consumer.acknowledgeAsync(message.getMessageId(),
+                                transaction1);
+                        ackFutures.add(ackFuture);
+                    }
+                    countDownLatch.countDown();
+                } catch (Exception e) {
+                    log.error("Failed to send/ack messages with transaction.", 
e);
+                    countDownLatch.countDown();
+                }
+            });
+        }
+        //wait the all send/ack op is executed and store its futures in the 
arraylist.
+        countDownLatch.await(10, TimeUnit.SECONDS);
+        transaction1.commit().get();
+
+        //verify the final status is right.
+        Field ackCountField = 
TransactionImpl.class.getDeclaredField("opCount");
+        ackCountField.setAccessible(true);
+        long ackCount = (long) ackCountField.get(transaction1);
+        Assert.assertEquals(ackCount, 0L);
+
+        for (int i = 0; i < totalMessage * threadSize; i++) {
+            Assert.assertTrue(sendFutures.get(i).isDone());
+            Assert.assertTrue(ackFutures.get(i).isDone());
+        }
+
+        //verify opFuture without any operation.
+        Transaction transaction2 = pulsarClient.newTransaction()
+                .withTransactionTimeout(10, TimeUnit.SECONDS)
+                .build()
+                .get();
+        Awaitility.await().until(() -> {
+            transaction2.commit().get();
+            return true;
+        });
+    }
+
     @Test
     public void testGetTxnID() throws Exception {
         Transaction transaction = pulsarClient.newTransaction()
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index b207387f963..c68c575ec4f 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -910,6 +910,23 @@ public class PulsarClientException extends IOException {
         }
     }
 
+    public static class TransactionHasOperationFailedException extends 
PulsarClientException {
+        /**
+         * Constructs an {@code TransactionHasOperationFailedException}.
+         */
+        public TransactionHasOperationFailedException() {
+            super("Now allowed to commit the transaction due to failed 
operations of producing or acknowledgment");
+        }
+
+        /**
+         * Constructs an {@code TransactionHasOperationFailedException} with 
the specified detail message.
+         * @param msg The detail message.
+         */
+        public TransactionHasOperationFailedException(String msg) {
+            super(msg);
+        }
+    }
+
     // wrap an exception to enriching more info messages.
     public static Throwable wrap(Throwable t, String msg) {
         msg += "\n" + t.getMessage();
@@ -972,6 +989,8 @@ public class PulsarClientException extends IOException {
             return new MessageAcknowledgeException(msg);
         } else if (t instanceof TransactionConflictException) {
             return new TransactionConflictException(msg);
+        } else if (t instanceof  TransactionHasOperationFailedException) {
+            return new TransactionHasOperationFailedException(msg);
         } else if (t instanceof PulsarClientException) {
             return new PulsarClientException(msg);
         } else if (t instanceof CompletionException) {
@@ -1070,6 +1089,8 @@ public class PulsarClientException extends IOException {
             newException = new MemoryBufferIsFullError(msg);
         } else if (cause instanceof NotFoundException) {
             newException = new NotFoundException(msg);
+        } else if (cause instanceof TransactionHasOperationFailedException) {
+            newException = new TransactionHasOperationFailedException(msg);
         } else {
             newException = new PulsarClientException(t);
         }
@@ -1133,7 +1154,8 @@ public class PulsarClientException extends IOException {
                 || t instanceof MessageAcknowledgeException
                 || t instanceof TransactionConflictException
                 || t instanceof ProducerBusyException
-                || t instanceof ConsumerBusyException) {
+                || t instanceof ConsumerBusyException
+                || t instanceof TransactionHasOperationFailedException) {
             return false;
         }
         return true;
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java
index d503abeafc2..d48039a6f33 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java
@@ -20,7 +20,9 @@ package org.apache.pulsar.client.api.transaction;
 
 import java.io.Serializable;
 import java.util.Objects;
+import lombok.AccessLevel;
 import lombok.Data;
+import lombok.Getter;
 import org.apache.pulsar.common.classification.InterfaceAudience;
 import org.apache.pulsar.common.classification.InterfaceStability;
 
@@ -48,22 +50,35 @@ public class TxnID implements Serializable {
      */
     private final long leastSigBits;
 
+    @Getter(AccessLevel.NONE)
+    private final transient int hashCode;
+
+    @Getter(AccessLevel.NONE)
+    private final transient String txnStr;
+
+    public TxnID(long mostSigBits, long leastSigBits) {
+        this.mostSigBits = mostSigBits;
+        this.leastSigBits = leastSigBits;
+        this.hashCode = Objects.hash(mostSigBits, leastSigBits);
+        this.txnStr = "(" + mostSigBits + "," + leastSigBits + ")";
+    }
+
     @Override
     public String toString() {
-        return "(" + mostSigBits + "," + leastSigBits + ")";
+        return txnStr;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(mostSigBits, leastSigBits);
+        return hashCode;
     }
 
     @Override
     public boolean equals(Object obj) {
         if (obj instanceof TxnID) {
             TxnID other = (TxnID) obj;
-            return Objects.equals(mostSigBits, other.mostSigBits)
-                    && Objects.equals(leastSigBits, other.leastSigBits);
+            return mostSigBits == other.mostSigBits
+                    && leastSigBits == other.leastSigBits;
         }
 
         return false;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index 68a10fed2d8..420bdfce478 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -21,18 +21,18 @@ package org.apache.pulsar.client.impl.transaction;
 import com.google.common.collect.Lists;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.Transaction;
 import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.InvalidTxnStatusException;
 import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.TransactionNotFoundException;
@@ -58,15 +58,24 @@ public class TransactionImpl implements Transaction , 
TimerTask {
     private final long txnIdLeastBits;
     private final long txnIdMostBits;
 
+    private final TxnID txnId;
+
     private final Map<String, CompletableFuture<Void>> registerPartitionMap;
     private final Map<Pair<String, String>, CompletableFuture<Void>> 
registerSubscriptionMap;
     private final TransactionCoordinatorClientImpl tcClient;
 
-    private final ArrayList<CompletableFuture<MessageId>> sendFutureList;
-    private final ArrayList<CompletableFuture<Void>> ackFutureList;
+    private CompletableFuture<Void> opFuture;
+
+    private volatile long opCount = 0L;
+    private static final AtomicLongFieldUpdater<TransactionImpl> 
OP_COUNT_UPDATE =
+            AtomicLongFieldUpdater.newUpdater(TransactionImpl.class, 
"opCount");
+
+
     private volatile State state;
     private static final AtomicReferenceFieldUpdater<TransactionImpl, State> 
STATE_UPDATE =
         AtomicReferenceFieldUpdater.newUpdater(TransactionImpl.class, 
State.class, "state");
+
+    private volatile boolean hasOpsFailed = false;
     private final Timeout timeout;
 
     @Override
@@ -83,13 +92,12 @@ public class TransactionImpl implements Transaction , 
TimerTask {
         this.transactionTimeoutMs = transactionTimeoutMs;
         this.txnIdLeastBits = txnIdLeastBits;
         this.txnIdMostBits = txnIdMostBits;
+        this.txnId = new TxnID(this.txnIdMostBits, this.txnIdLeastBits);
 
         this.registerPartitionMap = new ConcurrentHashMap<>();
         this.registerSubscriptionMap = new ConcurrentHashMap<>();
         this.tcClient = client.getTcClient();
-
-        this.sendFutureList = new ArrayList<>();
-        this.ackFutureList = new ArrayList<>();
+        this.opFuture = CompletableFuture.completedFuture(null);
         this.timeout = client.getTimer().newTimeout(this, 
transactionTimeoutMs, TimeUnit.MILLISECONDS);
 
     }
@@ -105,7 +113,7 @@ public class TransactionImpl implements Transaction , 
TimerTask {
                         return future.thenCompose(ignored -> 
CompletableFuture.completedFuture(null));
                     } else {
                         return tcClient.addPublishPartitionToTxnAsync(
-                                new TxnID(txnIdMostBits, txnIdLeastBits), 
Lists.newArrayList(topic))
+                                txnId, Lists.newArrayList(topic))
                                 .thenCompose(ignored -> 
CompletableFuture.completedFuture(null));
                     }
                 });
@@ -114,8 +122,25 @@ public class TransactionImpl implements Transaction , 
TimerTask {
         return completableFuture;
     }
 
-    public synchronized void registerSendOp(CompletableFuture<MessageId> 
sendFuture) {
-        sendFutureList.add(sendFuture);
+    public void registerSendOp(CompletableFuture<MessageId> newSendFuture) {
+        if (OP_COUNT_UPDATE.getAndIncrement(this) == 0) {
+            opFuture = new CompletableFuture<>();
+        }
+        // the opCount is always bigger than 0 if there is an exception,
+        // and then the opFuture will never be replaced.
+        newSendFuture.whenComplete((messageId, e) -> {
+            if (e != null) {
+                log.error("The transaction [{}:{}] get an exception when send 
messages.",
+                        txnIdMostBits, txnIdLeastBits, e);
+                if (!hasOpsFailed) {
+                    hasOpsFailed = true;
+                }
+            }
+            CompletableFuture<Void> future = opFuture;
+            if (OP_COUNT_UPDATE.decrementAndGet(this) == 0) {
+                future.complete(null);
+            }
+        });
     }
 
     // register the topics that will be modified by this transaction
@@ -129,7 +154,7 @@ public class TransactionImpl implements Transaction , 
TimerTask {
                         return future.thenCompose(ignored -> 
CompletableFuture.completedFuture(null));
                     } else {
                         return tcClient.addSubscriptionToTxnAsync(
-                                new TxnID(txnIdMostBits, txnIdLeastBits), 
topic, subscription)
+                                txnId, topic, subscription)
                                 .thenCompose(ignored -> 
CompletableFuture.completedFuture(null));
                     }
                 });
@@ -138,8 +163,25 @@ public class TransactionImpl implements Transaction , 
TimerTask {
         return completableFuture;
     }
 
-    public synchronized void registerAckOp(CompletableFuture<Void> ackFuture) {
-        ackFutureList.add(ackFuture);
+    public void registerAckOp(CompletableFuture<Void> newAckFuture) {
+        if (OP_COUNT_UPDATE.getAndIncrement(this) == 0) {
+            opFuture = new CompletableFuture<>();
+        }
+        // the opCount is always bigger than 0 if there is an exception,
+        // and then the opFuture will never be replaced.
+        newAckFuture.whenComplete((ignore, e) -> {
+            if (e != null) {
+                log.error("The transaction [{}:{}] get an exception when ack 
messages.",
+                        txnIdMostBits, txnIdLeastBits, e);
+                if (!hasOpsFailed) {
+                    hasOpsFailed = true;
+                }
+            }
+            CompletableFuture<Void> future = opFuture;
+            if (OP_COUNT_UPDATE.decrementAndGet(this) == 0) {
+                future.complete(null);
+            }
+        });
     }
 
     @Override
@@ -148,19 +190,13 @@ public class TransactionImpl implements Transaction , 
TimerTask {
         return checkState(State.OPEN, State.COMMITTING).thenCompose((value) -> 
{
             CompletableFuture<Void> commitFuture = new CompletableFuture<>();
             this.state = State.COMMITTING;
-<<<<<<< HEAD
-            allOpComplete().whenComplete((v, e) -> {
-                if (e != null) {
-                    abort().whenComplete((vx, ex) -> 
commitFuture.completeExceptionally(e));
-=======
             opFuture.whenComplete((v, e) -> {
                 if (hasOpsFailed) {
                     checkState(State.COMMITTING).thenCompose(__ -> 
internalAbort()).whenComplete((vx, ex) ->
                             commitFuture.completeExceptionally(
                                     new 
PulsarClientException.TransactionHasOperationFailedException()));
->>>>>>> 00d09cbbd2b ([fix][txn] Fix transaction is not aborted when send or 
ACK failed (#20055))
                 } else {
-                    tcClient.commitAsync(new TxnID(txnIdMostBits, 
txnIdLeastBits))
+                    tcClient.commitAsync(txnId)
                             .whenComplete((vx, ex) -> {
                                 if (ex != null) {
                                     if (ex instanceof 
TransactionNotFoundException
@@ -182,19 +218,8 @@ public class TransactionImpl implements Transaction , 
TimerTask {
     @Override
     public CompletableFuture<Void> abort() {
         timeout.cancel();
-<<<<<<< HEAD
-        return checkIfOpenOrAborting().thenCompose(value -> {
-            CompletableFuture<Void> abortFuture = new CompletableFuture<>();
-            this.state = State.ABORTING;
-            allOpComplete().whenComplete((v, e) -> {
-                if (e != null) {
-                    log.error(e.getMessage());
-                }
-                tcClient.abortAsync(new TxnID(txnIdMostBits, 
txnIdLeastBits)).whenComplete((vx, ex) -> {
-=======
         return checkState(State.OPEN, State.ABORTING).thenCompose(__ -> 
internalAbort());
     }
->>>>>>> 00d09cbbd2b ([fix][txn] Fix transaction is not aborted when send or 
ACK failed (#20055))
 
     private CompletableFuture<Void> internalAbort() {
         CompletableFuture<Void> abortFuture = new CompletableFuture<>();
@@ -221,7 +246,7 @@ public class TransactionImpl implements Transaction , 
TimerTask {
 
     @Override
     public TxnID getTxnID() {
-        return new TxnID(txnIdMostBits, txnIdLeastBits);
+        return this.txnId;
     }
 
     @Override
@@ -235,7 +260,7 @@ public class TransactionImpl implements Transaction , 
TimerTask {
         } else {
             completableFuture
                     .completeExceptionally(new InvalidTxnStatusException(
-                            new TxnID(txnIdMostBits, 
txnIdLeastBits).toString(), state.name(), State.OPEN.name()));
+                            txnId.toString(), state.name(), 
State.OPEN.name()));
             return false;
         }
     }
@@ -251,12 +276,4 @@ public class TransactionImpl implements Transaction , 
TimerTask {
                 + txnIdLeastBits + "] with unexpected state: " + 
actualState.name() + ", expect: "
                 + Arrays.toString(expectedStates)));
     }
-
-
-    private CompletableFuture<Void> allOpComplete() {
-        List<CompletableFuture<?>> futureList = new ArrayList<>();
-        futureList.addAll(sendFutureList);
-        futureList.addAll(ackFutureList);
-        return CompletableFuture.allOf(futureList.toArray(new 
CompletableFuture[0]));
-    }
 }

Reply via email to