This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new af164cbc10 Fix an issue with WorkerSketchFetcher not terminating on 
shutdown (#13459)
af164cbc10 is described below

commit af164cbc100185f33388759df9e866b468925c58
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Wed Nov 30 21:02:48 2022 +0530

    Fix an issue with WorkerSketchFetcher not terminating on shutdown (#13459)
    
    * Fix an issue with WorkerSketchFetcher not terminating on shutdown
    
    * Change threadpool name
---
 .../main/java/org/apache/druid/msq/exec/ControllerImpl.java  |  5 +++--
 .../java/org/apache/druid/msq/exec/WorkerSketchFetcher.java  | 12 +++++++++---
 2 files changed, 12 insertions(+), 5 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index cafc0f3892..318c33a759 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -524,6 +524,8 @@ public class ControllerImpl implements Controller
     context.registerController(this, closer);
 
     this.netClient = new 
ExceptionWrappingWorkerClient(context.taskClientFor(this));
+    closer.register(netClient::close);
+
     ClusterStatisticsMergeMode clusterStatisticsMergeMode =
         
MultiStageQueryContext.getClusterStatisticsMergeMode(task.getQuerySpec().getQuery().context());
 
@@ -532,8 +534,7 @@ public class ControllerImpl implements Controller
     int statisticsMaxRetainedBytes = 
WorkerMemoryParameters.createProductionInstanceForController(context.injector())
                                                                     
.getPartitionStatisticsMaxRetainedBytes();
     this.workerSketchFetcher = new WorkerSketchFetcher(netClient, 
clusterStatisticsMergeMode, statisticsMaxRetainedBytes);
-
-    closer.register(netClient::close);
+    closer.register(workerSketchFetcher::close);
 
     final boolean isDurableStorageEnabled =
         
MultiStageQueryContext.isDurableStorageEnabled(task.getQuerySpec().getQuery().context());
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java
index 3482b50daa..c4118a9d38 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java
@@ -25,6 +25,7 @@ import org.apache.druid.frame.key.ClusterByPartition;
 import org.apache.druid.frame.key.ClusterByPartitions;
 import org.apache.druid.java.util.common.Either;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.msq.kernel.StageDefinition;
 import org.apache.druid.msq.statistics.ClusterByStatisticsCollector;
@@ -39,13 +40,12 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.stream.IntStream;
 
 /**
  * Queues up fetching sketches from workers and progressively generates 
partitions boundaries.
  */
-public class WorkerSketchFetcher
+public class WorkerSketchFetcher implements AutoCloseable
 {
   private static final Logger log = new Logger(WorkerSketchFetcher.class);
   private static final int DEFAULT_THREAD_COUNT = 4;
@@ -63,7 +63,7 @@ public class WorkerSketchFetcher
   {
     this.workerClient = workerClient;
     this.clusterStatisticsMergeMode = clusterStatisticsMergeMode;
-    this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_COUNT);
+    this.executorService = Execs.multiThreaded(DEFAULT_THREAD_COUNT, 
"SketchFetcherThreadPool-%d");
     this.statisticsMaxRetainedBytes = statisticsMaxRetainedBytes;
   }
 
@@ -337,4 +337,10 @@ public class WorkerSketchFetcher
       return either.valueOrThrow().size();
     }
   }
+
+  @Override
+  public void close()
+  {
+    executorService.shutdownNow();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to