This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new c5835c29a1 Use durable super sorter intermediate storage only with
composable storage (#13748)
c5835c29a1 is described below
commit c5835c29a142861741cd546e26c7132df9b3698b
Author: Rohan Garg <[email protected]>
AuthorDate: Mon Feb 6 18:59:18 2023 +0530
Use durable super sorter intermediate storage only with composable storage
(#13748)
* This enables usage of durable storage connector only in case the
composable storage feature is enabled.
---
.../java/org/apache/druid/msq/exec/WorkerImpl.java | 45 ++++++++--------------
1 file changed, 16 insertions(+), 29 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
index f7c85b164b..3a8a037234 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
@@ -691,35 +691,22 @@ public class WorkerImpl implements Worker
final FileOutputChannelFactory fileOutputChannelFactory =
new FileOutputChannelFactory(fileChannelDirectory, frameSize,
intermediateSuperSorterLocalStorageTracker);
- if
(MultiStageQueryContext.isComposedIntermediateSuperSorterStorageEnabled(QueryContext.of(task.getContext())))
{
- if (durableStageStorageEnabled) {
- return new ComposingOutputChannelFactory(
- ImmutableList.of(
- fileOutputChannelFactory,
-
DurableStorageOutputChannelFactory.createStandardImplementation(
- task.getControllerTaskId(),
- task().getWorkerNumber(),
- stageNumber,
- task().getId(),
- frameSize,
- MSQTasks.makeStorageConnector(context.injector()),
- tmpDir
- )
- ),
- frameSize
- );
- } else {
- return fileOutputChannelFactory;
- }
- } else if (durableStageStorageEnabled) {
- return DurableStorageOutputChannelFactory.createStandardImplementation(
- task.getControllerTaskId(),
- task().getWorkerNumber(),
- stageNumber,
- task().getId(),
- frameSize,
- MSQTasks.makeStorageConnector(context.injector()),
- tmpDir
+ if
(MultiStageQueryContext.isComposedIntermediateSuperSorterStorageEnabled(QueryContext.of(task.getContext()))
&&
+ durableStageStorageEnabled) {
+ return new ComposingOutputChannelFactory(
+ ImmutableList.of(
+ fileOutputChannelFactory,
+ DurableStorageOutputChannelFactory.createStandardImplementation(
+ task.getControllerTaskId(),
+ task().getWorkerNumber(),
+ stageNumber,
+ task().getId(),
+ frameSize,
+ MSQTasks.makeStorageConnector(context.injector()),
+ tmpDir
+ )
+ ),
+ frameSize
);
} else {
return fileOutputChannelFactory;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]