codelipenghui commented on a change in pull request #13348:
URL: https://github.com/apache/pulsar/pull/13348#discussion_r776656469



##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
##########
@@ -63,8 +63,14 @@
     private final TransactionCoordinatorClientImpl tcClient;
     private Map<ConsumerImpl<?>, Integer> cumulativeAckConsumers;
 
-    private final ArrayList<CompletableFuture<MessageId>> sendFutureList;
-    private final ArrayList<CompletableFuture<Void>> ackFutureList;
+    /**
+     *  The number of operations are executing  in this transaction.
+     */
+    private final AtomicLong opsExecutingInTxn = new AtomicLong(0);

Review comment:
       Using the AtomicUpdater to avoid creating many AtomicLong instances.

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
##########
@@ -148,8 +152,32 @@ public synchronized void 
registerSendOp(CompletableFuture<MessageId> sendFuture)
         }
     }
 
+    public synchronized <T> void registerInternal(CompletableFuture<T> 
completableFuture) {
+        //There have been an exception, it means this transaction should not 
be commit,
+        //so there is no need to record the execution of other operations
+        if (!executedFuture.isCompletedExceptionally()) {

Review comment:
       all the completed futures should not increase the `opsExecutingInTxn`

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
##########
@@ -148,8 +152,32 @@ public synchronized void 
registerSendOp(CompletableFuture<MessageId> sendFuture)
         }
     }
 
+    public synchronized <T> void registerInternal(CompletableFuture<T> 
completableFuture) {
+        //There have been an exception, it means this transaction should not 
be commit,
+        //so there is no need to record the execution of other operations
+        if (!executedFuture.isCompletedExceptionally()) {
+            opsExecutingInTxn.incrementAndGet();
+            executedFuture = new CompletableFuture<>();

Review comment:
       Can achieve the purpose by only using one instance? Looks like,
   
   - A new future was created when the transaction opened.
   - when all the ongoing requests are done, the opsExecutingInTxn change to 0, 
and then compete the future

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
##########
@@ -163,25 +191,25 @@ public synchronized void 
registerCumulativeAckConsumer(ConsumerImpl<?> consumer)
     public CompletableFuture<Void> commit() {
         return checkIfOpenOrCommitting().thenCompose((value) -> {
             CompletableFuture<Void> commitFuture = new CompletableFuture<>();
-            this.state = State.COMMITTING;
-            allOpComplete().whenComplete((v, e) -> {
-                if (e != null) {
-                    abort().whenComplete((vx, ex) -> 
commitFuture.completeExceptionally(e));
-                } else {
-                    tcClient.commitAsync(new TxnID(txnIdMostBits, 
txnIdLeastBits))
-                            .whenComplete((vx, ex) -> {
-                                if (ex != null) {
-                                    if (ex instanceof 
TransactionNotFoundException
-                                            || ex instanceof 
InvalidTxnStatusException) {
-                                        this.state = State.ERROR;
-                                    }
-                                    commitFuture.completeExceptionally(ex);
-                                } else {
-                                    this.state = State.COMMITTED;
-                                    commitFuture.complete(vx);
+            executedFuture.thenRun(() -> {

Review comment:
       Should check if the `executedFuture` is completed? If false, we should 
give feedback to the caller?

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
##########
@@ -148,8 +152,32 @@ public synchronized void 
registerSendOp(CompletableFuture<MessageId> sendFuture)
         }
     }
 
+    public synchronized <T> void registerInternal(CompletableFuture<T> 
completableFuture) {
+        //There have been an exception, it means this transaction should not 
be commit,
+        //so there is no need to record the execution of other operations
+        if (!executedFuture.isCompletedExceptionally()) {
+            opsExecutingInTxn.incrementAndGet();
+            executedFuture = new CompletableFuture<>();
+            completableFuture.thenRun(() -> {
+                synchronized (this) {

Review comment:
       Why need `synchronized` here?

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
##########
@@ -163,25 +191,25 @@ public synchronized void 
registerCumulativeAckConsumer(ConsumerImpl<?> consumer)
     public CompletableFuture<Void> commit() {
         return checkIfOpenOrCommitting().thenCompose((value) -> {
             CompletableFuture<Void> commitFuture = new CompletableFuture<>();
-            this.state = State.COMMITTING;
-            allOpComplete().whenComplete((v, e) -> {
-                if (e != null) {
-                    abort().whenComplete((vx, ex) -> 
commitFuture.completeExceptionally(e));
-                } else {
-                    tcClient.commitAsync(new TxnID(txnIdMostBits, 
txnIdLeastBits))
-                            .whenComplete((vx, ex) -> {
-                                if (ex != null) {
-                                    if (ex instanceof 
TransactionNotFoundException
-                                            || ex instanceof 
InvalidTxnStatusException) {
-                                        this.state = State.ERROR;
-                                    }
-                                    commitFuture.completeExceptionally(ex);
-                                } else {
-                                    this.state = State.COMMITTED;
-                                    commitFuture.complete(vx);
+            executedFuture.thenRun(() -> {
+                this.state = State.COMMITTING;
+                tcClient.commitAsync(new TxnID(txnIdMostBits, txnIdLeastBits))
+                        .whenComplete((vx, ex) -> {
+                            if (ex != null) {
+                                if (ex instanceof TransactionNotFoundException
+                                        || ex instanceof 
InvalidTxnStatusException) {
+                                    this.state = State.ERROR;
                                 }
-                            });
-                }
+                                commitFuture.completeExceptionally(ex);
+                            } else {
+                                this.state = State.COMMITTED;
+                                commitFuture.complete(vx);
+                            }
+                        });
+            }).exceptionally(e -> {
+                commitFuture.completeExceptionally(new PulsarClientException
+                        .TransactionCanNotCommitException(this.txnIdMostBits, 
this.txnIdLeastBits, e));

Review comment:
       ```suggestion
                   commitFuture.completeExceptionally(e.getCause());
   ```

##########
File path: 
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
##########
@@ -910,6 +910,28 @@ public TransactionConflictException(String msg) {
         }
     }
 
+    public static class TransactionCanNotCommitException extends 
PulsarClientException {

Review comment:
       Since both commit or abort might get this exception if users did not 
handle send message futures or ack message futures properly, I think we should 
change it to `TransactionOngoingRequestsNotCompleteException`?

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
##########
@@ -148,8 +152,32 @@ public synchronized void 
registerSendOp(CompletableFuture<MessageId> sendFuture)
         }
     }
 
+    public synchronized <T> void registerInternal(CompletableFuture<T> 
completableFuture) {
+        //There have been an exception, it means this transaction should not 
be commit,
+        //so there is no need to record the execution of other operations
+        if (!executedFuture.isCompletedExceptionally()) {
+            opsExecutingInTxn.incrementAndGet();
+            executedFuture = new CompletableFuture<>();
+            completableFuture.thenRun(() -> {
+                synchronized (this) {
+                    opsExecutingInTxn.decrementAndGet();
+                    // This is the last operation so far.
+                    if (opsExecutingInTxn.get() == 0) {
+                        executedFuture.complete(null);
+                    }

Review comment:
       ```suggestion
                       // This is the last operation so far.
                       if (opsExecutingInTxn.decrementAndGet() == 0) {
                           executedFuture.complete(null);
                       }
   ```

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
##########
@@ -148,8 +152,32 @@ public synchronized void 
registerSendOp(CompletableFuture<MessageId> sendFuture)
         }
     }
 
+    public synchronized <T> void registerInternal(CompletableFuture<T> 
completableFuture) {
+        //There have been an exception, it means this transaction should not 
be commit,
+        //so there is no need to record the execution of other operations
+        if (!executedFuture.isCompletedExceptionally()) {
+            opsExecutingInTxn.incrementAndGet();
+            executedFuture = new CompletableFuture<>();
+            completableFuture.thenRun(() -> {
+                synchronized (this) {
+                    opsExecutingInTxn.decrementAndGet();
+                    // This is the last operation so far.
+                    if (opsExecutingInTxn.get() == 0) {
+                        executedFuture.complete(null);
+                    }
+                }
+            }).exceptionally(e -> {
+                //Complete this future exceptionally and there is no need to 
executed this method again.
+                synchronized (this) {

Review comment:
       Why need `synchronized` here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to