This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a7d5c64aeb Move MSQ temporary storage to a runtime parameter instead 
of being configured from query context (#14061)
a7d5c64aeb is described below

commit a7d5c64aeb7c5d70935e5ed797c8312af17264db
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Tue Apr 18 16:56:51 2023 +0530

    Move MSQ temporary storage to a runtime parameter instead of being 
configured from query context (#14061)
    
    *
        Adds new run time parameter druid.indexer.task.tmpStorageBytesPerTask. 
This sets a limit for the amount of temporary storage disk space used by tasks. 
This limit is currently only respected by MSQ tasks.
    *   Removes query context parameters 
intermediateSuperSorterStorageMaxLocalBytes and 
composedIntermediateSuperSorterStorageEnabled. Composed intermediate super 
sorter (which was enabled by composedIntermediateSuperSorterStorageEnabled) is 
now enabled automatically if durableShuffleStorage is set to true. 
intermediateSuperSorterStorageMaxLocalBytes is calculated from the limit set by 
the run time parameter druid.indexer.task.tmpStorageBytesPerTask.
---
 docs/configuration/index.md                        |   1 +
 docs/multi-stage-query/reference.md                |   6 +
 .../org/apache/druid/msq/exec/ControllerImpl.java  |  10 --
 .../java/org/apache/druid/msq/exec/WorkerImpl.java |  25 +++-
 .../druid/msq/exec/WorkerStorageParameters.java    | 158 +++++++++++++++++++++
 .../apache/druid/msq/guice/MSQIndexingModule.java  |   2 +
 .../msq/indexing/error/NotEnoughMemoryFault.java   |   2 +-
 .../error/NotEnoughTemporaryStorageFault.java      |  97 +++++++++++++
 .../querykit/BaseLeafFrameProcessorFactory.java    |   2 +-
 .../druid/msq/util/MultiStageQueryContext.java     |  30 ----
 .../org/apache/druid/msq/exec/WorkerImplTest.java  |   4 +-
 ...lTest.java => WorkerStorageParametersTest.java} |  32 ++---
 .../msq/indexing/error/MSQFaultSerdeTest.java      |   1 +
 .../org/apache/druid/msq/test/MSQTestBase.java     |   4 -
 .../druid/msq/test/MSQTestControllerContext.java   |  15 +-
 .../druid/indexing/common/config/TaskConfig.java   |  21 ++-
 .../indexing/common/config/TaskConfigBuilder.java  |  10 +-
 .../druid/indexing/overlord/TaskLifecycleTest.java |   1 +
 18 files changed, 342 insertions(+), 79 deletions(-)

diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 111667cb04..519abf5f6f 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1521,6 +1521,7 @@ Additional peon configs include:
 |`druid.indexer.task.restoreTasksOnRestart`|If true, MiddleManagers will 
attempt to stop tasks gracefully on shutdown and restore them on restart.|false|
 |`druid.indexer.task.ignoreTimestampSpecForDruidInputSource`|If true, tasks 
using the [Druid input source](../ingestion/native-batch-input-source.md) will 
ignore the provided timestampSpec, and will use the `__time` column of the 
input datasource. This option is provided for compatibility with ingestion 
specs written before Druid 0.22.0.|false|
 |`druid.indexer.task.storeEmptyColumns`|Boolean value for whether or not to 
store empty columns during ingestion. When set to true, Druid stores every 
column specified in the 
[`dimensionsSpec`](../ingestion/ingestion-spec.md#dimensionsspec). If you use 
schemaless ingestion and don't specify any dimensions to ingest, you must also 
set [`includeAllDimensions`](../ingestion/ingestion-spec.md#dimensionsspec) for 
Druid to store empty columns.<br/><br/>If you set `storeEmptyColumns` to false, 
 [...]
+|`druid.indexer.task.tmpStorageBytesPerTask`|Maximum number of bytes per task 
to be used to store temporary files on disk. This usage is split among all 
temporary storage usages for the task. An exception might be thrown if this 
limit is too low for the task or if this limit would be exceeded. This limit is 
currently respected only by MSQ tasks. Other types of tasks might exceed this 
limit. A value of -1 disables this limit.  |-1|
 |`druid.indexer.server.maxChatRequests`|Maximum number of concurrent requests 
served by a task's chat handler. Set to 0 to disable limiting.|0|
 
 If the peon is running in remote mode, there must be an Overlord up and 
running. Peons in remote mode can set the following configurations:
diff --git a/docs/multi-stage-query/reference.md 
b/docs/multi-stage-query/reference.md
index d2f8db55c1..f1569aaf62 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -337,6 +337,11 @@ cleaner can be scheduled to clean the directories 
corresponding to which there i
 the storage connector to work upon the durable storage. The durable storage 
location should only be utilized to store the output
 for cluster's MSQ tasks. If the location contains other files or directories, 
then they will get cleaned up as well.
 
+Enabling durable storage also enables the use of local disk to store temporary 
files, such as the intermediate files produced
+by the super sorter. The limit set by 
`druid.indexer.task.tmpStorageBytesPerTask` for maximum number of bytes of local
+storage to be used per task will be respected by MSQ tasks. If the configured 
limit is too low, `NotEnoughTemporaryStorageFault`
+may be thrown.
+
 ### Enable durable storage
 
 To enable durable storage, you need to set the following common service 
properties:
@@ -434,6 +439,7 @@ The following table describes error codes you may encounter 
in the `multiStageQu
 | <a name="error_TooManyWarnings">`TooManyWarnings`</a> | Exceeded the maximum 
allowed number of warnings of a particular type. | `rootErrorCode`: The error 
code corresponding to the exception that exceeded the required limit. <br /><br 
/>`maxWarnings`: Maximum number of warnings that are allowed for the 
corresponding `rootErrorCode`. |
 | <a name="error_TooManyWorkers">`TooManyWorkers`</a> | Exceeded the maximum 
number of simultaneously-running workers. See the [Limits](#limits) table for 
more details. | `workers`: The number of simultaneously running workers that 
exceeded a hard or soft limit. This may be larger than the number of workers in 
any one stage if multiple stages are running simultaneously. <br /><br 
/>`maxWorkers`: The hard or soft limit on workers that was exceeded. If this is 
lower than the hard limit (1, [...]
 | <a name="error_NotEnoughMemory">`NotEnoughMemory`</a> | Insufficient memory 
to launch a stage. | `suggestedServerMemory`: Suggested number of bytes of 
memory to allocate to a given process. <br /><br />`serverMemory`: The number 
of bytes of memory available to a single process.<br /><br />`usableMemory`: 
The number of usable bytes of memory for a single process.<br /><br 
/>`serverWorkers`: The number of workers running in a single process.<br /><br 
/>`serverThreads`: The number of thre [...]
+| <a name="error_NotEnoughTemporaryStorage">`NotEnoughTemporaryStorage`</a> | 
Insufficient temporary storage configured to launch a stage. This limit is set 
by the property `druid.indexer.task.tmpStorageBytesPerTask`. This property 
should be increased to the minimum suggested limit to resolve this.| 
`suggestedMinimumStorage`: Suggested number of bytes of temporary storage space 
to allocate to a given process. <br /><br />`configuredTemporaryStorage`: The 
number of bytes of storage curren [...]
 | <a name="error_WorkerFailed">`WorkerFailed`</a> | A worker task failed 
unexpectedly. | `errorMsg`<br /><br />`workerTaskId`: The ID of the worker 
task. |
 | <a name="error_WorkerRpcFailed">`WorkerRpcFailed`</a> | A remote procedure 
call to a worker task failed and could not recover. | `workerTaskId`: the id of 
the worker task |
 | <a name="error_UnknownError">`UnknownError`</a> | All other errors. | 
`message` |
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index fc737cd30a..c65dfc5498 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -597,16 +597,6 @@ public class ControllerImpl implements Controller
         .put(
             MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE,
             isDurableStorageEnabled
-        ).put(
-            
MultiStageQueryContext.CTX_COMPOSED_INTERMEDIATE_SUPER_SORTER_STORAGE,
-            
MultiStageQueryContext.isComposedIntermediateSuperSorterStorageEnabled(
-                task.getQuerySpec().getQuery().context()
-            )
-        ).put(
-            
MultiStageQueryContext.CTX_INTERMEDIATE_SUPER_SORTER_STORAGE_MAX_LOCAL_BYTES,
-            
MultiStageQueryContext.getIntermediateSuperSorterStorageMaxLocalBytes(
-                task.getQuerySpec().getQuery().context()
-            )
         ).put(
             MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED,
             maxParseExceptions
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
index e1505fd118..cb5881b3ba 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.msq.exec;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Suppliers;
@@ -179,6 +180,7 @@ public class WorkerImpl implements Worker
   private final ConcurrentHashMap<StageId, WorkerStageKernel> stageKernelMap = 
new ConcurrentHashMap<>();
   private final ByteTracker intermediateSuperSorterLocalStorageTracker;
   private final boolean durableStageStorageEnabled;
+  private final WorkerStorageParameters workerStorageParameters;
 
   /**
    * Set once in {@link #runTask} and never reassigned.
@@ -197,17 +199,31 @@ public class WorkerImpl implements Worker
   private volatile boolean controllerAlive = true;
 
   public WorkerImpl(MSQWorkerTask task, WorkerContext context)
+  {
+    this(
+        task,
+        context,
+        WorkerStorageParameters.createProductionInstance(
+            context.injector(),
+            
MultiStageQueryContext.isDurableStorageEnabled(QueryContext.of(task.getContext()))
 // If Durable Storage is enabled, then super sorter intermediate storage can 
be enabled.
+        )
+    );
+  }
+
+  @VisibleForTesting
+  public WorkerImpl(MSQWorkerTask task, WorkerContext context, 
WorkerStorageParameters workerStorageParameters)
   {
     this.task = task;
     this.context = context;
     this.selfDruidNode = context.selfNode();
     this.processorBouncer = context.processorBouncer();
-    this.intermediateSuperSorterLocalStorageTracker = new ByteTracker(
-        
MultiStageQueryContext.getIntermediateSuperSorterStorageMaxLocalBytes(QueryContext.of(task.getContext()))
-    );
     this.durableStageStorageEnabled = 
MultiStageQueryContext.isDurableStorageEnabled(
         QueryContext.of(task.getContext())
     );
+    this.workerStorageParameters = workerStorageParameters;
+
+    long maxBytes = 
workerStorageParameters.isIntermediateStorageLimitConfigured() ? 
workerStorageParameters.getIntermediateSuperSorterStorageMaxLocalBytes() : 
Long.MAX_VALUE;
+    this.intermediateSuperSorterLocalStorageTracker = new 
ByteTracker(maxBytes);
   }
 
   @Override
@@ -731,8 +747,7 @@ public class WorkerImpl implements Worker
     final FileOutputChannelFactory fileOutputChannelFactory =
         new FileOutputChannelFactory(fileChannelDirectory, frameSize, 
intermediateSuperSorterLocalStorageTracker);
 
-    if 
(MultiStageQueryContext.isComposedIntermediateSuperSorterStorageEnabled(QueryContext.of(task.getContext()))
-        && durableStageStorageEnabled) {
+    if (durableStageStorageEnabled && 
workerStorageParameters.isIntermediateStorageLimitConfigured()) {
       return new ComposingOutputChannelFactory(
           ImmutableList.of(
               fileOutputChannelFactory,
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerStorageParameters.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerStorageParameters.java
new file mode 100644
index 0000000000..38d7e28d32
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerStorageParameters.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.inject.Injector;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.msq.indexing.error.MSQException;
+import org.apache.druid.msq.indexing.error.NotEnoughTemporaryStorageFault;
+
+import java.util.Objects;
+
+/**
+ * Class for determining the amount of temporary disk space to allocate to 
various purposes, given the per-worker limit.
+ * Similar to {@link WorkerMemoryParameters}, but for temporary disk space.
+ *
+ * Currently only used to allocate disk space for intermediate output from 
super sorter storage, if intermediate super
+ * sorter storage is enabled.
+ *
+ * If it is enabled, keeps {@link #MINIMUM_BASIC_OPERATIONS_BYTES} for 
miscellaneous operations and
+ * configures the super sorter to use {@link 
#SUPER_SORTER_TMP_STORAGE_USABLE_FRACTION} of the remaining space for
+ * intermediate files. If this value is less than {@link 
#MINIMUM_SUPER_SORTER_TMP_STORAGE_BYTES},
+ * {@link NotEnoughTemporaryStorageFault} is thrown.
+ */
+public class WorkerStorageParameters
+{
+  /**
+   * Fraction of temporary worker storage that can be allocated to super 
sorter intermediate files.
+   */
+  private static final double SUPER_SORTER_TMP_STORAGE_USABLE_FRACTION = 0.8;
+
+  /**
+   * Fixed amount of temporary disc storage reserved for miscellaneous 
operations.
+   */
+  private static final long MINIMUM_BASIC_OPERATIONS_BYTES = 1_000_000_000L;
+
+  /**
+   * Minimum threshold for number of bytes required for intermediate files. If 
the number of bytes is less than this
+   * threshold and intermediate super sorter storage is enabled, {@link 
NotEnoughTemporaryStorageFault} is thrown.
+   */
+  private static final long MINIMUM_SUPER_SORTER_TMP_STORAGE_BYTES = 
1_000_000_000L;
+
+  private final long intermediateSuperSorterStorageMaxLocalBytes;
+
+  private WorkerStorageParameters(final long 
intermediateSuperSorterStorageMaxLocalBytes)
+  {
+    this.intermediateSuperSorterStorageMaxLocalBytes = 
intermediateSuperSorterStorageMaxLocalBytes;
+  }
+
+  public static WorkerStorageParameters createProductionInstance(
+      final Injector injector,
+      final boolean isIntermediateSuperSorterStorageEnabled
+  )
+  {
+    long tmpStorageBytesPerTask = 
injector.getInstance(TaskConfig.class).getTmpStorageBytesPerTask();
+    return createInstance(tmpStorageBytesPerTask, 
isIntermediateSuperSorterStorageEnabled);
+  }
+
+  @VisibleForTesting
+  public static WorkerStorageParameters createInstanceForTests(final long 
tmpStorageBytesPerTask)
+  {
+    return new WorkerStorageParameters(tmpStorageBytesPerTask);
+  }
+
+  /**
+   * Returns an object specifiying temporary disk-usage parameters
+   * @param tmpStorageBytesPerTask                  amount of disk space to be 
allocated per task for intermediate files.
+   * @param isIntermediateSuperSorterStorageEnabled whether intermediate super 
sorter storage is enabled
+   */
+  public static WorkerStorageParameters createInstance(
+      final long tmpStorageBytesPerTask,
+      final boolean isIntermediateSuperSorterStorageEnabled
+  )
+  {
+    if (!isIntermediateSuperSorterStorageEnabled || tmpStorageBytesPerTask == 
-1) {
+      return new WorkerStorageParameters(-1);
+    }
+
+    Preconditions.checkArgument(tmpStorageBytesPerTask > 0, "Temporary storage 
bytes passed: [%s] should be > 0", tmpStorageBytesPerTask);
+    long intermediateSuperSorterStorageMaxLocalBytes = 
computeUsableStorage(tmpStorageBytesPerTask);
+
+    if (intermediateSuperSorterStorageMaxLocalBytes < 
MINIMUM_SUPER_SORTER_TMP_STORAGE_BYTES) {
+      throw new MSQException(
+          new NotEnoughTemporaryStorageFault(
+              calculateSuggestedMinTemporaryStorage(),
+              tmpStorageBytesPerTask
+          )
+      );
+    }
+
+    return new 
WorkerStorageParameters(intermediateSuperSorterStorageMaxLocalBytes);
+  }
+
+  private static long computeUsableStorage(long tmpStorageBytesPerTask)
+  {
+    return (long) (SUPER_SORTER_TMP_STORAGE_USABLE_FRACTION * 
(tmpStorageBytesPerTask - MINIMUM_BASIC_OPERATIONS_BYTES));
+  }
+
+  private static long calculateSuggestedMinTemporaryStorage()
+  {
+    return MINIMUM_BASIC_OPERATIONS_BYTES + (long) 
(MINIMUM_SUPER_SORTER_TMP_STORAGE_BYTES / 
SUPER_SORTER_TMP_STORAGE_USABLE_FRACTION);
+  }
+
+  public long getIntermediateSuperSorterStorageMaxLocalBytes()
+  {
+    return intermediateSuperSorterStorageMaxLocalBytes;
+  }
+
+  public boolean isIntermediateStorageLimitConfigured()
+  {
+    return intermediateSuperSorterStorageMaxLocalBytes != -1;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    WorkerStorageParameters that = (WorkerStorageParameters) o;
+    return intermediateSuperSorterStorageMaxLocalBytes == 
that.intermediateSuperSorterStorageMaxLocalBytes;
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(intermediateSuperSorterStorageMaxLocalBytes);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "WorkerStorageParameters{" +
+           "intermediateSuperSorterStorageMaxLocalBytes=" + 
intermediateSuperSorterStorageMaxLocalBytes +
+           '}';
+  }
+}
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java
index b10ee8d88f..8cc7ab35bd 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java
@@ -52,6 +52,7 @@ import 
org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault;
 import org.apache.druid.msq.indexing.error.InvalidNullByteFault;
 import org.apache.druid.msq.indexing.error.MSQFault;
 import org.apache.druid.msq.indexing.error.NotEnoughMemoryFault;
+import org.apache.druid.msq.indexing.error.NotEnoughTemporaryStorageFault;
 import org.apache.druid.msq.indexing.error.QueryNotSupportedFault;
 import org.apache.druid.msq.indexing.error.QueryRuntimeFault;
 import org.apache.druid.msq.indexing.error.RowTooLargeFault;
@@ -117,6 +118,7 @@ public class MSQIndexingModule implements DruidModule
       InsertTimeNullFault.class,
       InsertTimeOutOfBoundsFault.class,
       InvalidNullByteFault.class,
+      NotEnoughTemporaryStorageFault.class,
       NotEnoughMemoryFault.class,
       QueryNotSupportedFault.class,
       QueryRuntimeFault.class,
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/NotEnoughMemoryFault.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/NotEnoughMemoryFault.java
index 0ea447958e..ed30179306 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/NotEnoughMemoryFault.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/NotEnoughMemoryFault.java
@@ -47,7 +47,7 @@ public class NotEnoughMemoryFault extends BaseMSQFault
   {
     super(
         CODE,
-        "Not enough memory. Required al teast %,d bytes. (total = %,d bytes; 
usable = %,d bytes; server workers = %,d; server threads = %,d). Increase JVM 
memory with the -xmx option"
+        "Not enough memory. Required at least %,d bytes. (total = %,d bytes; 
usable = %,d bytes; server workers = %,d; server threads = %,d). Increase JVM 
memory with the -xmx option"
         + (serverWorkers > 1 ? " or reduce number of server workers" : ""),
         suggestedServerMemory,
         serverMemory,
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/NotEnoughTemporaryStorageFault.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/NotEnoughTemporaryStorageFault.java
new file mode 100644
index 0000000000..523bde42de
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/NotEnoughTemporaryStorageFault.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing.error;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import java.util.Objects;
+
+@JsonTypeName(NotEnoughTemporaryStorageFault.CODE)
+public class NotEnoughTemporaryStorageFault extends BaseMSQFault
+{
+  static final String CODE = "NotEnoughTemporaryStorageFault";
+
+  private final long suggestedMinimumStorage;
+  private final long configuredTemporaryStorage;
+
+  @JsonCreator
+  public NotEnoughTemporaryStorageFault(
+      @JsonProperty("suggestedMinimumStorage") final long 
suggestedMinimumStorage,
+      @JsonProperty("configuredTemporaryStorage") final long 
configuredTemporaryStorage
+  )
+  {
+    super(
+        CODE,
+        "Not enough temporary storage space for intermediate files. Requires 
at least %,d bytes. (configured = %,d bytes). Increase the limit by increasing 
tmpStorageBytesPerTask or "
+        + "disable durable storage by setting the context parameter 
durableShuffleStorage as false.",
+        suggestedMinimumStorage,
+        configuredTemporaryStorage
+    );
+
+    this.suggestedMinimumStorage = suggestedMinimumStorage;
+    this.configuredTemporaryStorage = configuredTemporaryStorage;
+  }
+
+  @JsonProperty
+  public long getSuggestedMinimumStorage()
+  {
+    return suggestedMinimumStorage;
+  }
+
+  @JsonProperty
+  public long getConfiguredTemporaryStorage()
+  {
+    return configuredTemporaryStorage;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    NotEnoughTemporaryStorageFault that = (NotEnoughTemporaryStorageFault) o;
+    return suggestedMinimumStorage == that.suggestedMinimumStorage
+           && configuredTemporaryStorage == that.configuredTemporaryStorage;
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(super.hashCode(), suggestedMinimumStorage, 
configuredTemporaryStorage);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "NotEnoughTemporaryStorageFault{" +
+           "suggestedMinimumStorage=" + suggestedMinimumStorage +
+           ", configuredTemporaryStorage=" + configuredTemporaryStorage +
+           '}';
+  }
+}
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java
index d0abbd80de..d57db72264 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java
@@ -75,7 +75,7 @@ public abstract class BaseLeafFrameProcessorFactory extends 
BaseFrameProcessorFa
   ) throws IOException
   {
     // BaseLeafFrameProcessorFactory is used for native Druid queries, where 
the following input cases can happen:
-    //   1) Union datasources: N nonbroadcast inputs, which are are treated as 
one big input
+    //   1) Union datasources: N nonbroadcast inputs, which are treated as one 
big input
     //   2) Join datasources: one nonbroadcast input, N broadcast inputs
     //   3) All other datasources: single input
 
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
index e1348d19cb..d42b6eb83d 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
@@ -51,13 +51,6 @@ import java.util.stream.Collectors;
  * List of context parameters not present in external docs:
  * <br></br>
  * <ol>
- * <li><b>composedIntermediateSuperSorterStorageEnabled</b>: Whether to enable 
automatic fallback to durable storage from
- * local storage for sorting's intermediate data. Requires to set-up 
`intermediateSuperSorterStorageMaxLocalBytes` limit
- * for local storage and durable shuffle storage feature as well. Default 
value is <b>false</b>.</li>
- *
- * <li><b>intermediateSuperSorterStorageMaxLocalBytes</b>: Whether to enable a 
byte limit on local storage for
- * sorting's intermediate data. If that limit is crossed,the task fails with 
{@link org.apache.druid.query.ResourceLimitExceededException}`.
- * Default value is <b>9223372036854775807</b> </li>
  *
  * <li><b>maxInputBytesPerWorker</b>: Should be used in conjunction with 
taskAssignment `auto` mode. When dividing the
  * input of a stage among the workers, this parameter determines the maximum 
size in bytes that are given to a single worker
@@ -97,13 +90,6 @@ public class MultiStageQueryContext
   public static final String CTX_CLUSTER_STATISTICS_MERGE_MODE = 
"clusterStatisticsMergeMode";
   public static final String DEFAULT_CLUSTER_STATISTICS_MERGE_MODE = 
ClusterStatisticsMergeMode.PARALLEL.toString();
 
-  public static final String 
CTX_INTERMEDIATE_SUPER_SORTER_STORAGE_MAX_LOCAL_BYTES =
-      "intermediateSuperSorterStorageMaxLocalBytes";
-  public static final String CTX_COMPOSED_INTERMEDIATE_SUPER_SORTER_STORAGE =
-      "composedIntermediateSuperSorterStorageEnabled";
-  private static final boolean 
DEFAULT_COMPOSED_INTERMEDIATE_SUPER_SORTER_STORAGE = false;
-  private static final long 
DEFAULT_INTERMEDIATE_SUPER_SORTER_STORAGE_MAX_LOCAL_BYTES = Long.MAX_VALUE;
-
   public static final String CTX_DESTINATION = "destination";
   private static final String DEFAULT_DESTINATION = null;
 
@@ -157,22 +143,6 @@ public class MultiStageQueryContext
     );
   }
 
-  public static boolean isComposedIntermediateSuperSorterStorageEnabled(final 
QueryContext queryContext)
-  {
-    return queryContext.getBoolean(
-        CTX_COMPOSED_INTERMEDIATE_SUPER_SORTER_STORAGE,
-        DEFAULT_COMPOSED_INTERMEDIATE_SUPER_SORTER_STORAGE
-    );
-  }
-
-  public static long getIntermediateSuperSorterStorageMaxLocalBytes(final 
QueryContext queryContext)
-  {
-    return queryContext.getLong(
-        CTX_INTERMEDIATE_SUPER_SORTER_STORAGE_MAX_LOCAL_BYTES,
-        DEFAULT_INTERMEDIATE_SUPER_SORTER_STORAGE_MAX_LOCAL_BYTES
-    );
-  }
-
   public static ClusterStatisticsMergeMode 
getClusterStatisticsMergeMode(QueryContext queryContext)
   {
     return ClusterStatisticsMergeMode.valueOf(
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerImplTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerImplTest.java
index a6f67da0a9..171f476ebf 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerImplTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerImplTest.java
@@ -40,14 +40,14 @@ public class WorkerImplTest
   @Test
   public void testFetchStatsThrows()
   {
-    WorkerImpl worker = new WorkerImpl(new MSQWorkerTask("controller", "ds", 
1, new HashMap<>(), 0), workerContext);
+    WorkerImpl worker = new WorkerImpl(new MSQWorkerTask("controller", "ds", 
1, new HashMap<>(), 0), workerContext, 
WorkerStorageParameters.createInstanceForTests(Long.MAX_VALUE));
     Assert.assertThrows(ISE.class, () -> worker.fetchStatisticsSnapshot(new 
StageId("xx", 1)));
   }
 
   @Test
   public void testFetchStatsWithTimeChunkThrows()
   {
-    WorkerImpl worker = new WorkerImpl(new MSQWorkerTask("controller", "ds", 
1, new HashMap<>(), 0), workerContext);
+    WorkerImpl worker = new WorkerImpl(new MSQWorkerTask("controller", "ds", 
1, new HashMap<>(), 0), workerContext, 
WorkerStorageParameters.createInstanceForTests(Long.MAX_VALUE));
     Assert.assertThrows(ISE.class, () -> 
worker.fetchStatisticsSnapshotForTimeChunk(new StageId("xx", 1), 1L));
   }
 
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerImplTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerStorageParametersTest.java
similarity index 50%
copy from 
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerImplTest.java
copy to 
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerStorageParametersTest.java
index a6f67da0a9..9da8d323cf 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerImplTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerStorageParametersTest.java
@@ -19,36 +19,26 @@
 
 package org.apache.druid.msq.exec;
 
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.msq.indexing.MSQWorkerTask;
-import org.apache.druid.msq.kernel.StageId;
+import org.apache.druid.msq.indexing.error.MSQException;
+import org.apache.druid.msq.indexing.error.NotEnoughTemporaryStorageFault;
 import org.junit.Assert;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
 
-import java.util.HashMap;
-
-
-@RunWith(MockitoJUnitRunner.class)
-public class WorkerImplTest
+public class WorkerStorageParametersTest
 {
-  @Mock
-  WorkerContext workerContext;
-
   @Test
-  public void testFetchStatsThrows()
+  public void test_WorkerStorageParameter_createInstance()
   {
-    WorkerImpl worker = new WorkerImpl(new MSQWorkerTask("controller", "ds", 
1, new HashMap<>(), 0), workerContext);
-    Assert.assertThrows(ISE.class, () -> worker.fetchStatisticsSnapshot(new 
StageId("xx", 1)));
+    
Assert.assertEquals(WorkerStorageParameters.createInstanceForTests(1000000000), 
WorkerStorageParameters.createInstance(2_250_000_000L, true));
   }
 
   @Test
-  public void testFetchStatsWithTimeChunkThrows()
+  public void test_insufficientTemporaryStorage()
   {
-    WorkerImpl worker = new WorkerImpl(new MSQWorkerTask("controller", "ds", 
1, new HashMap<>(), 0), workerContext);
-    Assert.assertThrows(ISE.class, () -> 
worker.fetchStatisticsSnapshotForTimeChunk(new StageId("xx", 1), 1L));
+    final MSQException e = Assert.assertThrows(
+        MSQException.class,
+        () -> WorkerStorageParameters.createInstance(2_000L, true)
+    );
+    Assert.assertEquals(new NotEnoughTemporaryStorageFault(2250000000L, 2000), 
e.getFault());
   }
-
 }
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
index 8342510c76..b46224d327 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
@@ -81,6 +81,7 @@ public class MSQFaultSerdeTest
     assertFaultSerde(UnknownFault.forMessage("the message"));
     assertFaultSerde(new WorkerFailedFault("the worker task", "the error 
msg"));
     assertFaultSerde(new WorkerRpcFailedFault("the worker task"));
+    assertFaultSerde(new NotEnoughTemporaryStorageFault(250, 2));
   }
 
   @Test
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index 6b50d84673..b037eecbb8 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -232,8 +232,6 @@ public class MSQTestBase extends BaseCalciteQueryTest
       ImmutableMap.<String, Object>builder()
                   .putAll(DEFAULT_MSQ_CONTEXT)
                   .put(MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE, 
true)
-                  
.put(MultiStageQueryContext.CTX_COMPOSED_INTERMEDIATE_SUPER_SORTER_STORAGE, 
true)
-                  
.put(MultiStageQueryContext.CTX_INTERMEDIATE_SUPER_SORTER_STORAGE_MAX_LOCAL_BYTES,
 100) // added so that practically everything still goes to durable storage 
channel
                   .build();
 
 
@@ -241,8 +239,6 @@ public class MSQTestBase extends BaseCalciteQueryTest
       ImmutableMap.<String, Object>builder()
                   .putAll(DEFAULT_MSQ_CONTEXT)
                   .put(MultiStageQueryContext.CTX_FAULT_TOLERANCE, true)
-                  
.put(MultiStageQueryContext.CTX_COMPOSED_INTERMEDIATE_SUPER_SORTER_STORAGE, 
true)
-                  
.put(MultiStageQueryContext.CTX_INTERMEDIATE_SUPER_SORTER_STORAGE_MAX_LOCAL_BYTES,
 100) // added so that practically everything still goes to durable storage 
channel
                   .build();
 
   public static final Map<String, Object> SEQUENTIAL_MERGE_MSQ_CONTEXT =
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
index efd3575c61..73cd900a5b 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
@@ -43,7 +43,10 @@ import org.apache.druid.msq.exec.WorkerClient;
 import org.apache.druid.msq.exec.WorkerImpl;
 import org.apache.druid.msq.exec.WorkerManagerClient;
 import org.apache.druid.msq.exec.WorkerMemoryParameters;
+import org.apache.druid.msq.exec.WorkerStorageParameters;
 import org.apache.druid.msq.indexing.MSQWorkerTask;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.query.QueryContext;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
 import org.mockito.ArgumentMatchers;
@@ -116,9 +119,19 @@ public class MSQTestControllerContext implements 
ControllerContext
       if (controller == null) {
         throw new ISE("Controller needs to be set using the register method");
       }
+
+      WorkerStorageParameters workerStorageParameters;
+      // If we are testing durable storage, set a low limit on storage so that 
the durable storage will be used.
+      if 
(MultiStageQueryContext.isDurableStorageEnabled(QueryContext.of(task.getContext())))
 {
+        workerStorageParameters = 
WorkerStorageParameters.createInstanceForTests(100);
+      } else {
+        workerStorageParameters = 
WorkerStorageParameters.createInstanceForTests(Long.MAX_VALUE);
+      }
+
       Worker worker = new WorkerImpl(
           task,
-          new MSQTestWorkerContext(inMemoryWorkers, controller, mapper, 
injector, workerMemoryParameters)
+          new MSQTestWorkerContext(inMemoryWorkers, controller, mapper, 
injector, workerMemoryParameters),
+          workerStorageParameters
       );
       inMemoryWorkers.put(task.getId(), worker);
       statusMap.put(task.getId(), TaskStatus.running(task.getId()));
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
index dc46bb30cc..202cbd4ba5 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
@@ -78,6 +78,7 @@ public class TaskConfig
   private static final Period DEFAULT_DIRECTORY_LOCK_TIMEOUT = new 
Period("PT10M");
   private static final Period DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT = new 
Period("PT5M");
   private static final boolean DEFAULT_STORE_EMPTY_COLUMNS = true;
+  private static final long DEFAULT_TMP_STORAGE_BYTES_PER_TASK = -1;
 
   @JsonProperty
   private final String baseDir;
@@ -121,6 +122,9 @@ public class TaskConfig
   @JsonProperty
   private final boolean encapsulatedTask;
 
+  @JsonProperty
+  private final long tmpStorageBytesPerTask;
+
   @JsonCreator
   public TaskConfig(
       @JsonProperty("baseDir") String baseDir,
@@ -137,7 +141,8 @@ public class TaskConfig
       // deprecated, only set to true to fall back to older behavior
       @JsonProperty("batchProcessingMode") String batchProcessingMode,
       @JsonProperty("storeEmptyColumns") @Nullable Boolean storeEmptyColumns,
-      @JsonProperty("encapsulatedTask") boolean enableTaskLevelLogPush
+      @JsonProperty("encapsulatedTask") boolean enableTaskLevelLogPush,
+      @JsonProperty("tmpStorageBytesPerTask") @Nullable Long 
tmpStorageBytesPerTask
   )
   {
     this.baseDir = baseDir == null ? System.getProperty("java.io.tmpdir") : 
baseDir;
@@ -183,6 +188,7 @@ public class TaskConfig
     }
     log.debug("Batch processing mode:[%s]", this.batchProcessingMode);
     this.storeEmptyColumns = storeEmptyColumns == null ? 
DEFAULT_STORE_EMPTY_COLUMNS : storeEmptyColumns;
+    this.tmpStorageBytesPerTask = tmpStorageBytesPerTask == null ? 
DEFAULT_TMP_STORAGE_BYTES_PER_TASK : tmpStorageBytesPerTask;
   }
 
   private TaskConfig(
@@ -199,7 +205,8 @@ public class TaskConfig
       boolean batchMemoryMappedIndex,
       BatchProcessingMode batchProcessingMode,
       boolean storeEmptyColumns,
-      boolean encapsulatedTask
+      boolean encapsulatedTask,
+      long tmpStorageBytesPerTask
   )
   {
     this.baseDir = baseDir;
@@ -216,6 +223,7 @@ public class TaskConfig
     this.batchProcessingMode = batchProcessingMode;
     this.storeEmptyColumns = storeEmptyColumns;
     this.encapsulatedTask = encapsulatedTask;
+    this.tmpStorageBytesPerTask = tmpStorageBytesPerTask;
   }
 
   @JsonProperty
@@ -326,6 +334,12 @@ public class TaskConfig
     return encapsulatedTask;
   }
 
+  @JsonProperty
+  public long getTmpStorageBytesPerTask()
+  {
+    return tmpStorageBytesPerTask;
+  }
+
   private String defaultDir(@Nullable String configParameter, final String 
defaultVal)
   {
     if (configParameter == null) {
@@ -351,7 +365,8 @@ public class TaskConfig
         batchMemoryMappedIndex,
         batchProcessingMode,
         storeEmptyColumns,
-        encapsulatedTask
+        encapsulatedTask,
+        tmpStorageBytesPerTask
     );
   }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java
index 84b46949cd..af920ebbeb 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java
@@ -40,6 +40,7 @@ public class TaskConfigBuilder
   private String batchProcessingMode;
   private Boolean storeEmptyColumns;
   private boolean enableTaskLevelLogPush;
+  private Long tmpStorageBytesPerTask;
 
   public TaskConfigBuilder setBaseDir(String baseDir)
   {
@@ -125,6 +126,12 @@ public class TaskConfigBuilder
     return this;
   }
 
+  public TaskConfigBuilder setTmpStorageBytesPerTask(Long 
tmpStorageBytesPerTask)
+  {
+    this.tmpStorageBytesPerTask = tmpStorageBytesPerTask;
+    return this;
+  }
+
   public TaskConfig build()
   {
     return new TaskConfig(
@@ -141,7 +148,8 @@ public class TaskConfigBuilder
         batchMemoryMappedIndex,
         batchProcessingMode,
         storeEmptyColumns,
-        enableTaskLevelLogPush
+        enableTaskLevelLogPush,
+        tmpStorageBytesPerTask
     );
   }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index 5b06b5ffd1..585d52539d 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -611,6 +611,7 @@ public class TaskLifecycleTest extends 
InitializedNullHandlingTest
         .setBaseDir(temporaryFolder.newFolder().toString())
         .setDefaultRowFlushBoundary(50000)
         
.setBatchProcessingMode(TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name())
+        .setTmpStorageBytesPerTask(-1L)
         .build();
 
     return new TaskToolboxFactory(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to