congbobo184 commented on a change in pull request #13481:
URL: https://github.com/apache/pulsar/pull/13481#discussion_r791797179



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -500,58 +466,43 @@ private void addCommitTxnRequest(TxnID txnId, Map<String, 
Long> properties, long
                 return null;
             });
         }
-        return commitFuture;
-    }
-
-    private void addAbortTxnRequest(TxnID txnId, Consumer consumer, long 
lowWaterMark,
-                                    CompletableFuture<Void> completableFuture) 
{
-        acceptQueue.add(() -> abortTxn(txnId, consumer, lowWaterMark, 
true).thenAccept(v ->
-                completableFuture.complete(null)).exceptionally(e -> {
-            completableFuture.completeExceptionally(e);
-            return null;
-        }));
     }
 
     @Override
-    public synchronized CompletableFuture<Void> abortTxn(TxnID txnId, Consumer 
consumer,
-                                                         long lowWaterMark, 
boolean isInCacheRequest) {
-        if (!checkIfReady()) {
-            synchronized (PendingAckHandleImpl.this) {
+    public CompletableFuture<Void> commitTxn(TxnID txnID, Map<String, Long> 
properties, long lowWaterMark) {
+        CompletableFuture<Void> commitFuture = new CompletableFuture<>();
+        internalPinnedExecutor.execute(() -> {
+            if (!checkIfReady()) {
                 if (state == State.Initializing) {
-                    CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
-                    addAbortTxnRequest(txnId, consumer, lowWaterMark, 
completableFuture);
-                    return completableFuture;
+                    addCommitTxnRequest(txnID, properties, lowWaterMark, 
commitFuture);

Review comment:
       use switch  case

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -130,73 +142,40 @@ public PendingAckHandleImpl(PersistentSubscription 
persistentSubscription) {
 
     private void initPendingAckStore() {
         if (changeToInitializingState()) {
-            synchronized (PendingAckHandleImpl.this) {
-                if (!checkIfClose()) {
-                    this.pendingAckStoreFuture =
-                            
pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
-                    this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
-                        pendingAckStore.replayAsync(this,
-                                ((PersistentTopic) 
persistentSubscription.getTopic()).getBrokerService()
-                                        
.getPulsar().getTransactionReplayExecutor());
-                    }).exceptionally(e -> {
-                        acceptQueue.clear();
-                        changeToErrorState();
-                        exceptionHandleFuture(e.getCause());
-                        log.error("PendingAckHandleImpl init fail! TopicName : 
{}, SubName: {}", topicName, subName, e);
-                        return null;
-                    });
-                }
+            if (!checkIfClose()) {
+                this.pendingAckStoreFuture =
+                        
pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
+                this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
+                    pendingAckStore.replayAsync(this,
+                            (ScheduledExecutorService) internalPinnedExecutor);
+                }).exceptionally(e -> {
+                    acceptQueue.clear();
+                    changeToErrorState();
+                    log.error("PendingAckHandleImpl init fail! TopicName : {}, 
SubName: {}", topicName, subName, e);
+                    exceptionHandleFuture(e.getCause());

Review comment:
       when exceptionHandleFuture()  we should handle request witch in cache 
and don't clear the queue. may can implement it on next pr.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -611,6 +562,34 @@ private void addAbortTxnRequest(TxnID txnId, Consumer 
consumer, long lowWaterMar
         return abortFuture;
     }
 
+    @Override
+    public CompletableFuture<Void> abortTxn(TxnID txnId, Consumer consumer, 
long lowWaterMark) {
+        CompletableFuture<Void> abortFuture = new CompletableFuture<>();
+        internalPinnedExecutor.execute(() -> {
+            if (!checkIfReady()) {
+                if (state == State.Initializing) {

Review comment:
       use switch case is better

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
##########
@@ -309,11 +310,10 @@ public PulsarService(ServiceConfiguration config,
                 new DefaultThreadFactory("zk-cache-callback"));
 
         if (config.isTransactionCoordinatorEnabled()) {
-            this.transactionReplayExecutor = Executors.newScheduledThreadPool(
-                    config.getNumTransactionReplayThreadPoolSize(),
-                    new DefaultThreadFactory("transaction-replay"));
+            this.transactionExecutorProvider = new 
ExecutorProvider(this.getConfiguration().getNumIOThreads(),

Review comment:
       use config.getNumTransactionReplayThreadPoolSize()




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to