This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 228afaa3df [multistage] separate leaf and intermediate threadpool 
(#10698)
228afaa3df is described below

commit 228afaa3df59df3698b014135fff27dac8c892e9
Author: Rong Rong <[email protected]>
AuthorDate: Fri Apr 28 22:32:35 2023 -0700

    [multistage] separate leaf and intermediate threadpool (#10698)
    
    * separate leaf and intermediate threadpool
    
    one is pull model and one is push model and thus will cause starvation and 
distributed deadlock
    
    * address comments
    
    ---------
    
    Co-authored-by: Rong Rong <[email protected]>
---
 .../apache/pinot/query/runtime/QueryRunner.java    | 25 ++++++++++++++++------
 .../pinot/query/service/QueryServerTest.java       |  9 +++++---
 .../service/dispatch/QueryDispatcherTest.java      |  9 +++++---
 3 files changed, 30 insertions(+), 13 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index b30a1f9f93..cba39dc5bf 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.query.runtime;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -96,7 +97,8 @@ public class QueryRunner {
    *   </li>
    * </ol>
    */
-  private ExecutorService _queryWorkerExecutorService;
+  private ExecutorService _queryWorkerIntermExecutorService;
+  private ExecutorService _queryWorkerLeafExecutorService;
   /**
    * Query runner threads are used for:
    * <ol>
@@ -128,11 +130,14 @@ public class QueryRunner {
     try {
       long releaseMs = 
config.getProperty(QueryConfig.KEY_OF_SCHEDULER_RELEASE_TIMEOUT_MS,
           QueryConfig.DEFAULT_SCHEDULER_RELEASE_TIMEOUT_MS);
-      _queryWorkerExecutorService = 
Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS,
-          new NamedThreadFactory("query_worker_on_" + _port + "_port"));
+      _queryWorkerIntermExecutorService = 
Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS,
+          new NamedThreadFactory("query_intermediate_worker_on_" + _port + 
"_port"));
+      _queryWorkerLeafExecutorService = 
Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS,
+          new NamedThreadFactory("query_leaf_worker_on_" + _port + "_port"));
       _queryRunnerExecutorService = 
Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_RUNNER_THREADS,
           new NamedThreadFactory("query_runner_on_" + _port + "_port"));
-      _scheduler = new OpChainSchedulerService(new 
RoundRobinScheduler(releaseMs), _queryWorkerExecutorService);
+      _scheduler = new OpChainSchedulerService(new 
RoundRobinScheduler(releaseMs),
+          getQueryWorkerIntermExecutorService());
       _mailboxService = new MailboxService(_hostname, _port, config, 
_scheduler::onDataAvailable);
       _serverExecutor = new ServerQueryExecutorV1Impl();
       _serverExecutor.init(config.subset(PINOT_V1_SERVER_QUERY_CONFIG_PREFIX), 
instanceDataManager, serverMetrics);
@@ -177,8 +182,14 @@ public class QueryRunner {
     _scheduler.cancel(requestId);
   }
 
-  public ExecutorService getQueryWorkerExecutorService() {
-    return _queryWorkerExecutorService;
+  @VisibleForTesting
+  public ExecutorService getQueryWorkerLeafExecutorService() {
+    return _queryWorkerLeafExecutorService;
+  }
+
+  @VisibleForTesting
+  public ExecutorService getQueryWorkerIntermExecutorService() {
+    return _queryWorkerIntermExecutorService;
   }
 
   public ExecutorService getQueryRunnerExecutorService() {
@@ -204,7 +215,7 @@ public class QueryRunner {
       for (ServerPlanRequestContext requestContext : serverQueryRequests) {
         ServerQueryRequest request = new 
ServerQueryRequest(requestContext.getInstanceRequest(),
             new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), 
System.currentTimeMillis());
-        serverQueryResults.add(processServerQuery(request, 
_scheduler.getWorkerPool()));
+        serverQueryResults.add(processServerQuery(request, 
getQueryWorkerLeafExecutorService()));
       }
       LOGGER.debug(
           "RequestId:" + requestId + " StageId:" + 
distributedStagePlan.getStageId() + " Leaf stage v1 processing time:"
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
index fe0f54667b..5a996f0b9e 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
@@ -61,8 +61,10 @@ public class QueryServerTest extends QueryTestSet {
   private static final int QUERY_SERVER_COUNT = 2;
   private static final String KEY_OF_SERVER_INSTANCE_HOST = 
"pinot.query.runner.server.hostname";
   private static final String KEY_OF_SERVER_INSTANCE_PORT = 
"pinot.query.runner.server.port";
-  private static final ExecutorService WORKER_EXECUTOR_SERVICE = 
Executors.newFixedThreadPool(
-      ResourceManager.DEFAULT_QUERY_WORKER_THREADS, new 
NamedThreadFactory("QueryServerTest_Worker"));
+  private static final ExecutorService LEAF_WORKER_EXECUTOR_SERVICE = 
Executors.newFixedThreadPool(
+      ResourceManager.DEFAULT_QUERY_WORKER_THREADS, new 
NamedThreadFactory("QueryDispatcherTest_LeafWorker"));
+  private static final ExecutorService INTERM_WORKER_EXECUTOR_SERVICE = 
Executors.newFixedThreadPool(
+      ResourceManager.DEFAULT_QUERY_WORKER_THREADS, new 
NamedThreadFactory("QueryDispatcherTest_IntermWorker"));
   private static final ExecutorService RUNNER_EXECUTOR_SERVICE = 
Executors.newFixedThreadPool(
       ResourceManager.DEFAULT_QUERY_RUNNER_THREADS, new 
NamedThreadFactory("QueryServerTest_Runner"));
 
@@ -78,7 +80,8 @@ public class QueryServerTest extends QueryTestSet {
     for (int i = 0; i < QUERY_SERVER_COUNT; i++) {
       int availablePort = QueryTestUtils.getAvailablePort();
       QueryRunner queryRunner = Mockito.mock(QueryRunner.class);
-      
Mockito.when(queryRunner.getQueryWorkerExecutorService()).thenReturn(WORKER_EXECUTOR_SERVICE);
+      
Mockito.when(queryRunner.getQueryWorkerLeafExecutorService()).thenReturn(LEAF_WORKER_EXECUTOR_SERVICE);
+      
Mockito.when(queryRunner.getQueryWorkerIntermExecutorService()).thenReturn(INTERM_WORKER_EXECUTOR_SERVICE);
       
Mockito.when(queryRunner.getQueryRunnerExecutorService()).thenReturn(RUNNER_EXECUTOR_SERVICE);
       QueryServer queryServer = new QueryServer(availablePort, queryRunner);
       queryServer.start();
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
index af98b32d12..c7a1fbaf81 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
@@ -51,8 +51,10 @@ import org.testng.collections.Lists;
 public class QueryDispatcherTest extends QueryTestSet {
   private static final Random RANDOM_REQUEST_ID_GEN = new Random();
   private static final int QUERY_SERVER_COUNT = 2;
-  private static final ExecutorService WORKER_EXECUTOR_SERVICE = 
Executors.newFixedThreadPool(
-      ResourceManager.DEFAULT_QUERY_WORKER_THREADS, new 
NamedThreadFactory("QueryDispatcherTest_Worker"));
+  private static final ExecutorService LEAF_WORKER_EXECUTOR_SERVICE = 
Executors.newFixedThreadPool(
+      ResourceManager.DEFAULT_QUERY_WORKER_THREADS, new 
NamedThreadFactory("QueryDispatcherTest_LeafWorker"));
+  private static final ExecutorService INTERM_WORKER_EXECUTOR_SERVICE = 
Executors.newFixedThreadPool(
+      ResourceManager.DEFAULT_QUERY_WORKER_THREADS, new 
NamedThreadFactory("QueryDispatcherTest_IntermWorker"));
   private static final ExecutorService RUNNER_EXECUTOR_SERVICE = 
Executors.newFixedThreadPool(
       ResourceManager.DEFAULT_QUERY_RUNNER_THREADS, new 
NamedThreadFactory("QueryDispatcherTest_Runner"));
 
@@ -69,7 +71,8 @@ public class QueryDispatcherTest extends QueryTestSet {
     for (int i = 0; i < QUERY_SERVER_COUNT; i++) {
       int availablePort = QueryTestUtils.getAvailablePort();
       QueryRunner queryRunner = Mockito.mock(QueryRunner.class);;
-      
Mockito.when(queryRunner.getQueryWorkerExecutorService()).thenReturn(WORKER_EXECUTOR_SERVICE);
+      
Mockito.when(queryRunner.getQueryWorkerLeafExecutorService()).thenReturn(LEAF_WORKER_EXECUTOR_SERVICE);
+      
Mockito.when(queryRunner.getQueryWorkerIntermExecutorService()).thenReturn(INTERM_WORKER_EXECUTOR_SERVICE);
       
Mockito.when(queryRunner.getQueryRunnerExecutorService()).thenReturn(RUNNER_EXECUTOR_SERVICE);
       QueryServer queryServer = new QueryServer(availablePort, queryRunner);
       queryServer = Mockito.spy(queryServer);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to