codelipenghui commented on a change in pull request #13481:
URL: https://github.com/apache/pulsar/pull/13481#discussion_r776565752



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
##########
@@ -308,11 +309,10 @@ public PulsarService(ServiceConfiguration config,
                 new DefaultThreadFactory("zk-cache-callback"));
 
         if (config.isTransactionCoordinatorEnabled()) {
-            this.transactionReplayExecutor = Executors.newScheduledThreadPool(
-                    config.getNumTransactionReplayThreadPoolSize(),
-                    new DefaultThreadFactory("transaction-replay"));
+            this.internalExecutorService = new 
ExecutorProvider(this.getConfiguration().getNumIOThreads(),
+                    "pulsar-executorProvider-internal");

Review comment:
       ```suggestion
                       "pulsar-transaction-executor");
   ```
   
   Currently, no other domains use this executor provider, we can just give a 
name that is used by transactions.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
##########
@@ -252,7 +253,7 @@
     private PulsarResources pulsarResources;
 
     private TransactionPendingAckStoreProvider 
transactionPendingAckStoreProvider;
-    private final ScheduledExecutorService transactionReplayExecutor;
+    private final ExecutorProvider internalExecutorService;

Review comment:
       ```suggestion
       private final ExecutorProvider transactionExecutorProvider;
   ```

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -105,7 +105,7 @@ public TopicTransactionBuffer(PersistentTopic topic) {
         this.takeSnapshotIntervalTime = topic.getBrokerService().getPulsar()
                 
.getConfiguration().getTransactionBufferSnapshotMinTimeInMillis();
         this.maxReadPosition = (PositionImpl) 
topic.getManagedLedger().getLastConfirmedEntry();
-        
this.topic.getBrokerService().getPulsar().getTransactionReplayExecutor()
+        
this.topic.getBrokerService().getPulsar().getInternalExecutorService().getExecutor()

Review comment:
       We don't need a pinned executor here right?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
##########
@@ -252,7 +253,7 @@
     private PulsarResources pulsarResources;
 
     private TransactionPendingAckStoreProvider 
transactionPendingAckStoreProvider;
-    private final ScheduledExecutorService transactionReplayExecutor;
+    private final ExecutorProvider internalExecutorService;

Review comment:
       And we can remove `numTransactionReplayThreadPoolSize` in the 
ServiceConfiguration.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to