congbobo184 commented on a change in pull request #13481:
URL: https://github.com/apache/pulsar/pull/13481#discussion_r776121803



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java
##########
@@ -44,7 +44,7 @@ public MLPendingAckReplyCallBack(PendingAckHandleImpl 
pendingAckHandle) {
 
     @Override
     public void replayComplete() {
-        synchronized (pendingAckHandle) {
+        pendingAckHandle.getInternalPinnedExecutor().submit(() -> {

Review comment:
       use execute method. if you use submit, it will return future, we don't 
need the future

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -291,46 +275,51 @@ private void addIndividualAcknowledgeMessageRequest(TxnID 
txnID,
         return completableFuture;
     }
 
-    private void addCumulativeAcknowledgeMessageRequest(TxnID txnID,
-                                                        List<PositionImpl> 
positions,
-                                                        
CompletableFuture<Void> completableFuture) {
-        acceptQueue.add(() -> cumulativeAcknowledgeMessage(txnID, positions, 
true).thenAccept(v ->
-                completableFuture.complete(null)).exceptionally(e -> {
-            completableFuture.completeExceptionally(e);
-            return null;
-        }));
-    }
-
     @Override
-    public CompletableFuture<Void> cumulativeAcknowledgeMessage(TxnID txnID,
-                                                                
List<PositionImpl> positions,
+    public CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID,
+                                                                
List<MutablePair<PositionImpl, Integer>> positions,
                                                                 boolean 
isInCacheRequest) {
-        if (!checkIfReady()) {
-            CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
-            synchronized (PendingAckHandleImpl.this) {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        internalPinnedExecutor.submit(() -> {
+            if (!checkIfReady()) {
                 switch (state) {
                     case Initializing:
-                        addCumulativeAcknowledgeMessageRequest(txnID, 
positions, completableFuture);
-                        return completableFuture;
+                        addIndividualAcknowledgeMessageRequest(txnID, 
positions, completableFuture);
+                        return;
                     case None:
-                        addCumulativeAcknowledgeMessageRequest(txnID, 
positions, completableFuture);
+                        addIndividualAcknowledgeMessageRequest(txnID, 
positions, completableFuture);
                         initPendingAckStore();
-                        return completableFuture;
+                        return;
                     case Error:
                         completableFuture.completeExceptionally(
                                 new 
ServiceUnitNotReadyException("PendingAckHandle not replay error!"));
-                        return completableFuture;
+                        return;
                     case Close:
                         completableFuture.completeExceptionally(
                                 new 
ServiceUnitNotReadyException("PendingAckHandle have been closed!"));
-                        return completableFuture;
+                        return;
                     default:
                         break;
-
                 }
             }
-        }
+            internalIndividualAcknowledgeMessage(txnID, positions, 
completableFuture);
+        });
+        return completableFuture;
+    }
 
+    private void addCumulativeAcknowledgeMessageRequest(TxnID txnID,
+                                                        List<PositionImpl> 
positions,
+                                                        
CompletableFuture<Void> completableFuture) {
+        acceptQueue.add(() -> internalCumulativeAcknowledgeMessage(txnID, 
positions, new CompletableFuture<>())

Review comment:
       same above `addIndividualAcknowledgeMessageRequest `

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -610,6 +586,37 @@ private void addAbortTxnRequest(TxnID txnId, Consumer 
consumer, long lowWaterMar
         return abortFuture;
     }
 
+    @Override
+    public CompletableFuture<Void> abortTxn(TxnID txnId, Consumer consumer,
+                                                         long lowWaterMark, 
boolean isInCacheRequest) {
+        CompletableFuture<Void> abortFuture = new CompletableFuture<>();
+        internalPinnedExecutor.submit(() -> {

Review comment:
       use excute

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -305,6 +307,8 @@ public BrokerService(PulsarService pulsar, EventLoopGroup 
eventLoopGroup) throws
                 .newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("pulsar-stats-updater"));
         this.authorizationService = new AuthorizationService(
                 pulsar.getConfiguration(), pulsar().getPulsarResources());
+        this.internalExecutorService = new 
ExecutorProvider(pulsar.getConfiguration().getNumIOThreads(),

Review comment:
       now only transaction pending ack will use it, so when enable transaction 
we can init it 

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -126,76 +132,54 @@ public PendingAckHandleImpl(PersistentSubscription 
persistentSubscription) {
                 completeHandleFuture();
             }
         });
+
+        internalPinnedExecutor = persistentSubscription
+                .getTopic()
+                .getBrokerService()
+                .getInternalExecutorService()
+                .getExecutor();
     }
 
     private void initPendingAckStore() {
         if (changeToInitializingState()) {
-            synchronized (PendingAckHandleImpl.this) {
-                if (!checkIfClose()) {
-                    this.pendingAckStoreFuture =
-                            
pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
-                    this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
-                        pendingAckStore.replayAsync(this,
-                                ((PersistentTopic) 
persistentSubscription.getTopic()).getBrokerService()
-                                        
.getPulsar().getTransactionReplayExecutor());
-                    }).exceptionally(e -> {
-                        acceptQueue.clear();
-                        changeToErrorState();
-                        log.error("PendingAckHandleImpl init fail! TopicName : 
{}, SubName: {}", topicName, subName, e);
-                        return null;
-                    });
-                }
+            if (!checkIfClose()) {
+                this.pendingAckStoreFuture =
+                        
pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
+                this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
+                    pendingAckStore.replayAsync(this,
+                            
persistentSubscription.getTopic().getBrokerService()
+                                    
.getPulsar().getTransactionReplayExecutor());
+                }).exceptionally(e -> {
+                    acceptQueue.clear();
+                    changeToErrorState();
+                    log.error("PendingAckHandleImpl init fail! TopicName : {}, 
SubName: {}", topicName, subName, e);
+                    return null;
+                });
             }
         }
     }
 
     private void addIndividualAcknowledgeMessageRequest(TxnID txnID,
                                                         
List<MutablePair<PositionImpl, Integer>> positions,
                                                         
CompletableFuture<Void> completableFuture) {
-        acceptQueue.add(() -> individualAcknowledgeMessage(txnID, positions, 
true).thenAccept(v ->
-                completableFuture.complete(null)).exceptionally(e -> {
+        acceptQueue.add(() -> internalIndividualAcknowledgeMessage(txnID, 
positions, new CompletableFuture<>())

Review comment:
       why we need to create a CompletableFuture? If 
`internalIndividualAcknowledgeMessage` can handle completableFuture, we don't 
need to create a new future 

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -291,46 +275,51 @@ private void addIndividualAcknowledgeMessageRequest(TxnID 
txnID,
         return completableFuture;
     }
 
-    private void addCumulativeAcknowledgeMessageRequest(TxnID txnID,
-                                                        List<PositionImpl> 
positions,
-                                                        
CompletableFuture<Void> completableFuture) {
-        acceptQueue.add(() -> cumulativeAcknowledgeMessage(txnID, positions, 
true).thenAccept(v ->
-                completableFuture.complete(null)).exceptionally(e -> {
-            completableFuture.completeExceptionally(e);
-            return null;
-        }));
-    }
-
     @Override
-    public CompletableFuture<Void> cumulativeAcknowledgeMessage(TxnID txnID,
-                                                                
List<PositionImpl> positions,
+    public CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID,
+                                                                
List<MutablePair<PositionImpl, Integer>> positions,
                                                                 boolean 
isInCacheRequest) {
-        if (!checkIfReady()) {
-            CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
-            synchronized (PendingAckHandleImpl.this) {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        internalPinnedExecutor.submit(() -> {

Review comment:
       use excute

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -394,56 +381,50 @@ private void addCumulativeAcknowledgeMessageRequest(TxnID 
txnID,
         return completableFuture;
     }
 
-    private void addCommitTxnRequest(TxnID txnId, Map<String, Long> 
properties, long lowWaterMark,
-                                    CompletableFuture<Void> completableFuture) 
{
-        acceptQueue.add(() -> commitTxn(txnId, properties, lowWaterMark, 
true).thenAccept(v ->
-                completableFuture.complete(null)).exceptionally(e -> {
-            completableFuture.completeExceptionally(e);
-            return null;
-        }));
-    }
-
     @Override
-    public synchronized CompletableFuture<Void> commitTxn(TxnID txnID, 
Map<String, Long> properties,
-                                                          long lowWaterMark, 
boolean isInCacheRequest) {
-        if (!checkIfReady()) {
-            synchronized (PendingAckHandleImpl.this) {
-                if (state == State.Initializing) {
-                    CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
-                    addCommitTxnRequest(txnID, properties, lowWaterMark, 
completableFuture);
-                    return completableFuture;
-                } else if (state == State.None) {
-                    CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
-                    addCommitTxnRequest(txnID, properties, lowWaterMark, 
completableFuture);
-                    initPendingAckStore();
-                    return completableFuture;
-                } else if (checkIfReady()) {
-
-                } else {
-                    if (state == State.Error) {
-                        return FutureUtil.failedFuture(
+    public CompletableFuture<Void> cumulativeAcknowledgeMessage(TxnID txnID,
+                                                                
List<PositionImpl> positions,
+                                                                boolean 
isInCacheRequest) {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        internalPinnedExecutor.submit(() -> {

Review comment:
       use excute

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -502,55 +483,50 @@ private void addCommitTxnRequest(TxnID txnId, Map<String, 
Long> properties, long
         return commitFuture;
     }
 
-    private void addAbortTxnRequest(TxnID txnId, Consumer consumer, long 
lowWaterMark,
-                                    CompletableFuture<Void> completableFuture) 
{
-        acceptQueue.add(() -> abortTxn(txnId, consumer, lowWaterMark, 
true).thenAccept(v ->
-                completableFuture.complete(null)).exceptionally(e -> {
-            completableFuture.completeExceptionally(e);
-            return null;
-        }));
-    }
-
     @Override
-    public synchronized CompletableFuture<Void> abortTxn(TxnID txnId, Consumer 
consumer,
-                                                         long lowWaterMark, 
boolean isInCacheRequest) {
-        if (!checkIfReady()) {
-            synchronized (PendingAckHandleImpl.this) {
+    public CompletableFuture<Void> commitTxn(TxnID txnID, Map<String, Long> 
properties,
+                                                          long lowWaterMark, 
boolean isInCacheRequest) {
+        CompletableFuture<Void> commitFuture = new CompletableFuture<>();
+        internalPinnedExecutor.submit(() -> {
+            if (!checkIfReady()) {
                 if (state == State.Initializing) {
                     CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
-                    addAbortTxnRequest(txnId, consumer, lowWaterMark, 
completableFuture);
-                    return completableFuture;
+                    addCommitTxnRequest(txnID, properties, lowWaterMark, 
completableFuture);
+                    return;
                 } else if (state == State.None) {
                     CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
-                    addAbortTxnRequest(txnId, consumer, lowWaterMark, 
completableFuture);
+                    addCommitTxnRequest(txnID, properties, lowWaterMark, 
completableFuture);
                     initPendingAckStore();
-                    return completableFuture;
+                    return;
                 } else if (checkIfReady()) {
 
                 } else {
                     if (state == State.Error) {
-                        return FutureUtil.failedFuture(
+                        commitFuture.completeExceptionally(
                                 new 
ServiceUnitNotReadyException("PendingAckHandle not replay error!"));
                     } else {
-                        return FutureUtil.failedFuture(
+                        commitFuture.completeExceptionally(
                                 new 
ServiceUnitNotReadyException("PendingAckHandle have been closed!"));
                     }
+                    return;
                 }
             }
-        }
-
+            internalCommitTxn(txnID, properties, lowWaterMark, commitFuture);
+        });
+        return commitFuture;
+    }
 
-        if (!acceptQueue.isEmpty() && !isInCacheRequest) {
-            synchronized (PendingAckHandleImpl.this) {
-                if (!acceptQueue.isEmpty()) {
-                    CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
-                    addAbortTxnRequest(txnId, consumer, lowWaterMark, 
completableFuture);
-                    return completableFuture;
-                }
-            }
-        }
+    private void addAbortTxnRequest(TxnID txnId, Consumer consumer, long 
lowWaterMark,
+                                    CompletableFuture<Void> completableFuture) 
{
+        acceptQueue.add(() -> internalAbortTxn(txnId, consumer, lowWaterMark, 
new CompletableFuture<>())

Review comment:
       same above `addIndividualAcknowledgeMessageRequest `

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -394,56 +381,50 @@ private void addCumulativeAcknowledgeMessageRequest(TxnID 
txnID,
         return completableFuture;
     }
 
-    private void addCommitTxnRequest(TxnID txnId, Map<String, Long> 
properties, long lowWaterMark,
-                                    CompletableFuture<Void> completableFuture) 
{
-        acceptQueue.add(() -> commitTxn(txnId, properties, lowWaterMark, 
true).thenAccept(v ->
-                completableFuture.complete(null)).exceptionally(e -> {
-            completableFuture.completeExceptionally(e);
-            return null;
-        }));
-    }
-
     @Override
-    public synchronized CompletableFuture<Void> commitTxn(TxnID txnID, 
Map<String, Long> properties,
-                                                          long lowWaterMark, 
boolean isInCacheRequest) {
-        if (!checkIfReady()) {
-            synchronized (PendingAckHandleImpl.this) {
-                if (state == State.Initializing) {
-                    CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
-                    addCommitTxnRequest(txnID, properties, lowWaterMark, 
completableFuture);
-                    return completableFuture;
-                } else if (state == State.None) {
-                    CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
-                    addCommitTxnRequest(txnID, properties, lowWaterMark, 
completableFuture);
-                    initPendingAckStore();
-                    return completableFuture;
-                } else if (checkIfReady()) {
-
-                } else {
-                    if (state == State.Error) {
-                        return FutureUtil.failedFuture(
+    public CompletableFuture<Void> cumulativeAcknowledgeMessage(TxnID txnID,
+                                                                
List<PositionImpl> positions,
+                                                                boolean 
isInCacheRequest) {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        internalPinnedExecutor.submit(() -> {
+            if (!checkIfReady()) {
+                switch (state) {
+                    case Initializing:
+                        addCumulativeAcknowledgeMessageRequest(txnID, 
positions, completableFuture);
+                        return;
+                    case None:
+                        addCumulativeAcknowledgeMessageRequest(txnID, 
positions, completableFuture);
+                        initPendingAckStore();
+                        return;
+                    case Error:
+                        completableFuture.completeExceptionally(
                                 new 
ServiceUnitNotReadyException("PendingAckHandle not replay error!"));
-                    } else {
-                        return FutureUtil.failedFuture(
+                        return;
+                    case Close:
+                        completableFuture.completeExceptionally(
                                 new 
ServiceUnitNotReadyException("PendingAckHandle have been closed!"));
-                    }
-
+                        return;
+                    default:
+                        break;
                 }
             }
-        }
+            internalCumulativeAcknowledgeMessage(txnID, positions, 
completableFuture);
+        });
 
-        if (!acceptQueue.isEmpty() && !isInCacheRequest) {
-            synchronized (PendingAckHandleImpl.this) {
-                if (!acceptQueue.isEmpty()) {
-                    CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
-                    addCommitTxnRequest(txnID, properties, lowWaterMark, 
completableFuture);
-                    return completableFuture;
-                }
-            }
-        }
+        return completableFuture;
+    }
 
-        CompletableFuture<Void> commitFuture = new CompletableFuture<>();
+    private void addCommitTxnRequest(TxnID txnId, Map<String, Long> 
properties, long lowWaterMark,

Review comment:
       same above `addIndividualAcknowledgeMessageRequest `




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to