This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new af164cbc10 Fix an issue with WorkerSketchFetcher not terminating on
shutdown (#13459)
af164cbc10 is described below
commit af164cbc100185f33388759df9e866b468925c58
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Wed Nov 30 21:02:48 2022 +0530
Fix an issue with WorkerSketchFetcher not terminating on shutdown (#13459)
* Fix an issue with WorkerSketchFetcher not terminating on shutdown
* Change threadpool name
---
.../main/java/org/apache/druid/msq/exec/ControllerImpl.java | 5 +++--
.../java/org/apache/druid/msq/exec/WorkerSketchFetcher.java | 12 +++++++++---
2 files changed, 12 insertions(+), 5 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index cafc0f3892..318c33a759 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -524,6 +524,8 @@ public class ControllerImpl implements Controller
context.registerController(this, closer);
this.netClient = new
ExceptionWrappingWorkerClient(context.taskClientFor(this));
+ closer.register(netClient::close);
+
ClusterStatisticsMergeMode clusterStatisticsMergeMode =
MultiStageQueryContext.getClusterStatisticsMergeMode(task.getQuerySpec().getQuery().context());
@@ -532,8 +534,7 @@ public class ControllerImpl implements Controller
int statisticsMaxRetainedBytes =
WorkerMemoryParameters.createProductionInstanceForController(context.injector())
.getPartitionStatisticsMaxRetainedBytes();
this.workerSketchFetcher = new WorkerSketchFetcher(netClient,
clusterStatisticsMergeMode, statisticsMaxRetainedBytes);
-
- closer.register(netClient::close);
+ closer.register(workerSketchFetcher::close);
final boolean isDurableStorageEnabled =
MultiStageQueryContext.isDurableStorageEnabled(task.getQuerySpec().getQuery().context());
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java
index 3482b50daa..c4118a9d38 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java
@@ -25,6 +25,7 @@ import org.apache.druid.frame.key.ClusterByPartition;
import org.apache.druid.frame.key.ClusterByPartitions;
import org.apache.druid.java.util.common.Either;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.statistics.ClusterByStatisticsCollector;
@@ -39,13 +40,12 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.stream.IntStream;
/**
* Queues up fetching sketches from workers and progressively generates
partitions boundaries.
*/
-public class WorkerSketchFetcher
+public class WorkerSketchFetcher implements AutoCloseable
{
private static final Logger log = new Logger(WorkerSketchFetcher.class);
private static final int DEFAULT_THREAD_COUNT = 4;
@@ -63,7 +63,7 @@ public class WorkerSketchFetcher
{
this.workerClient = workerClient;
this.clusterStatisticsMergeMode = clusterStatisticsMergeMode;
- this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_COUNT);
+ this.executorService = Execs.multiThreaded(DEFAULT_THREAD_COUNT,
"SketchFetcherThreadPool-%d");
this.statisticsMaxRetainedBytes = statisticsMaxRetainedBytes;
}
@@ -337,4 +337,10 @@ public class WorkerSketchFetcher
return either.valueOrThrow().size();
}
}
+
+ @Override
+ public void close()
+ {
+ executorService.shutdownNow();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]