This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 51dfde0284 Add maxInputBytesPerWorker as query context parameter 
(#13707)
51dfde0284 is described below

commit 51dfde02840017092486fb75be2b16566aff6a19
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Tue Jan 31 20:55:28 2023 +0530

    Add maxInputBytesPerWorker as query context parameter (#13707)
    
    * Add maxInputBytesPerWorker as query context parameter
    
    * Move documenation to msq specific docs
    
    * Update tests
    
    * Spacing
    
    * Address review comments
    
    * Fix test
    
    * Update docs/multi-stage-query/reference.md
    
    * Correct spelling mistake
    
    ---------
    
    Co-authored-by: Karan Kumar <[email protected]>
---
 docs/multi-stage-query/reference.md                  |  1 +
 .../org/apache/druid/msq/exec/ControllerImpl.java    |  4 ++++
 .../main/java/org/apache/druid/msq/exec/Limits.java  |  2 +-
 .../org/apache/druid/msq/kernel/StageDefinition.java | 20 +++++++++++++++++---
 .../druid/msq/kernel/StageDefinitionBuilder.java     | 11 ++++++++++-
 .../druid/msq/kernel/WorkerAssignmentStrategy.java   |  4 ++--
 .../druid/msq/util/MultiStageQueryContext.java       | 10 ++++++++++
 .../apache/druid/msq/kernel/StageDefinitionTest.java | 10 +++++++---
 .../druid/msq/util/MultiStageQueryContextTest.java   | 10 ++++++++++
 9 files changed, 62 insertions(+), 10 deletions(-)

diff --git a/docs/multi-stage-query/reference.md 
b/docs/multi-stage-query/reference.md
index 2791e5caea..8de7507f52 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -602,6 +602,7 @@ The following table lists the context parameters for the 
MSQ task engine:
 | `faultTolerance` | SELECT, INSERT, REPLACE<br /><br /> Whether to turn on 
fault tolerance mode or not. Failed workers are retried based on 
[Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly 
set to false.  | `false` |
 | `composedIntermediateSuperSorterStorageEnabled` | SELECT, INSERT, REPLACE<br 
/><br /> Whether to enable automatic fallback to durable storage from local 
storage for sorting's intermediate data. Requires to setup 
`intermediateSuperSorterStorageMaxLocalBytes` limit for local storage and 
durable shuffle storage feature as well.| `false` |
 | `intermediateSuperSorterStorageMaxLocalBytes` | SELECT, INSERT, REPLACE<br 
/><br /> Whether to enable a byte limit on local storage for sorting's 
intermediate data. If that limit is crossed, the task fails with 
`ResourceLimitExceededException`.| `9223372036854775807` |
+| `maxInputBytesPerWorker` | Should be used in conjunction with taskAssignment 
`auto` mode. When dividing the input of a stage among the workers, this 
parameter determines the maximum size in bytes that are given to a single 
worker before the next worker is chosen. This parameter is only used as a 
guideline during input slicing, and does not guarantee that a the input cannot 
be larger. For example, we have 3 files. 3, 7, 12 GB each. then we would end up 
using 2 worker: worker 1 -> 3, 7 a [...]
 
 ## Sketch Merging Mode
 This section details the advantages and performance of various Cluster By 
Statistics Merge Modes.
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index d3257a72fb..e190edae9e 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -1599,10 +1599,14 @@ public class ControllerImpl implements Controller
       final DataSchema dataSchema =
           generateDataSchema(querySpec, querySignature, queryClusterBy, 
columnMappings, jsonMapper);
 
+      final long maxInputBytesPerWorker =
+          
MultiStageQueryContext.getMaxInputBytesPerWorker(querySpec.getQuery().context());
+
       builder.add(
           StageDefinition.builder(queryDef.getNextStageNumber())
                          .inputs(new 
StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber()))
                          .maxWorkerCount(tuningConfig.getMaxNumWorkers())
+                         .maxInputBytesPerWorker(maxInputBytesPerWorker)
                          .processorFactory(
                              new SegmentGeneratorFrameProcessorFactory(
                                  dataSchema,
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java
index 31a309afb1..8bce9a7ba5 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java
@@ -62,7 +62,7 @@ public class Limits
   /**
    * Maximum number of input bytes per worker in case number of tasks is 
determined automatically.
    */
-  public static final long MAX_INPUT_BYTES_PER_WORKER = 10 * 1024 * 1024 * 
1024L;
+  public static final long DEFAULT_MAX_INPUT_BYTES_PER_WORKER = 10 * 1024 * 
1024 * 1024L;
 
   /**
    * Maximum size of the kernel manipulation queue in {@link 
org.apache.druid.msq.indexing.MSQControllerTask}.
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
index 5fe3518af2..083bc167df 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
@@ -33,6 +33,7 @@ import org.apache.druid.frame.read.FrameReader;
 import org.apache.druid.java.util.common.Either;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.msq.exec.Limits;
 import org.apache.druid.msq.input.InputSpec;
 import org.apache.druid.msq.input.InputSpecs;
 import org.apache.druid.msq.statistics.ClusterByStatisticsCollector;
@@ -85,6 +86,7 @@ public class StageDefinition
   private final FrameProcessorFactory processorFactory;
   private final RowSignature signature;
   private final int maxWorkerCount;
+  private final long maxInputBytesPerWorker;
   private final boolean shuffleCheckHasMultipleValues;
 
   @Nullable
@@ -102,7 +104,8 @@ public class StageDefinition
       @JsonProperty("signature") final RowSignature signature,
       @Nullable @JsonProperty("shuffleSpec") final ShuffleSpec shuffleSpec,
       @JsonProperty("maxWorkerCount") final int maxWorkerCount,
-      @JsonProperty("shuffleCheckHasMultipleValues") final boolean 
shuffleCheckHasMultipleValues
+      @JsonProperty("shuffleCheckHasMultipleValues") final boolean 
shuffleCheckHasMultipleValues,
+      @JsonProperty("maxInputBytesPerWorker") final Long maxInputBytesPerWorker
   )
   {
     this.id = Preconditions.checkNotNull(id, "id");
@@ -122,6 +125,8 @@ public class StageDefinition
     this.maxWorkerCount = maxWorkerCount;
     this.shuffleCheckHasMultipleValues = shuffleCheckHasMultipleValues;
     this.frameReader = Suppliers.memoize(() -> 
FrameReader.create(signature))::get;
+    this.maxInputBytesPerWorker = maxInputBytesPerWorker == null ?
+                                  Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER : 
maxInputBytesPerWorker;
 
     if (shuffleSpec != null && shuffleSpec.needsStatistics() && 
shuffleSpec.getClusterBy().getColumns().isEmpty()) {
       throw new IAE("Cannot shuffle with spec [%s] and nil clusterBy", 
shuffleSpec);
@@ -241,6 +246,12 @@ public class StageDefinition
     return maxWorkerCount;
   }
 
+  @JsonProperty
+  public long getMaxInputBytesPerWorker()
+  {
+    return maxInputBytesPerWorker;
+  }
+
   @JsonProperty("shuffleCheckHasMultipleValues")
   @JsonInclude(JsonInclude.Include.NON_DEFAULT)
   boolean getShuffleCheckHasMultipleValues()
@@ -330,7 +341,8 @@ public class StageDefinition
            && Objects.equals(broadcastInputNumbers, that.broadcastInputNumbers)
            && Objects.equals(processorFactory, that.processorFactory)
            && Objects.equals(signature, that.signature)
-           && Objects.equals(shuffleSpec, that.shuffleSpec);
+           && Objects.equals(shuffleSpec, that.shuffleSpec)
+           && Objects.equals(maxInputBytesPerWorker, 
that.maxInputBytesPerWorker);
   }
 
   @Override
@@ -344,7 +356,8 @@ public class StageDefinition
         signature,
         maxWorkerCount,
         shuffleCheckHasMultipleValues,
-        shuffleSpec
+        shuffleSpec,
+        maxInputBytesPerWorker
     );
   }
 
@@ -360,6 +373,7 @@ public class StageDefinition
            ", maxWorkerCount=" + maxWorkerCount +
            ", shuffleSpec=" + shuffleSpec +
            (shuffleCheckHasMultipleValues ? ", shuffleCheckHasMultipleValues=" 
+ shuffleCheckHasMultipleValues : "") +
+           ", maxInputBytesPerWorker=" + maxInputBytesPerWorker +
            '}';
   }
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinitionBuilder.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinitionBuilder.java
index e1364eeb16..384b78fd33 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinitionBuilder.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinitionBuilder.java
@@ -21,6 +21,7 @@ package org.apache.druid.msq.kernel;
 
 import it.unimi.dsi.fastutil.ints.IntRBTreeSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.msq.exec.Limits;
 import org.apache.druid.msq.input.InputSpec;
 import org.apache.druid.segment.column.RowSignature;
 
@@ -42,6 +43,7 @@ public class StageDefinitionBuilder
   private int maxWorkerCount = 1;
   private ShuffleSpec shuffleSpec = null;
   private boolean shuffleCheckHasMultipleValues = false;
+  private long maxInputBytesPerWorker = 
Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER;
 
   /**
    * Package-private: callers should prefer {@link 
StageDefinition#builder(int)} rather than this constructor.
@@ -105,6 +107,12 @@ public class StageDefinitionBuilder
     return this;
   }
 
+  public StageDefinitionBuilder maxInputBytesPerWorker(final long 
maxInputBytesPerWorker)
+  {
+    this.maxInputBytesPerWorker = maxInputBytesPerWorker;
+    return this;
+  }
+
   int getStageNumber()
   {
     return stageNumber;
@@ -120,7 +128,8 @@ public class StageDefinitionBuilder
         signature,
         shuffleSpec,
         maxWorkerCount,
-        shuffleCheckHasMultipleValues
+        shuffleCheckHasMultipleValues,
+        maxInputBytesPerWorker
     );
   }
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java
index 0e947c77a5..c813778f45 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java
@@ -57,7 +57,7 @@ public enum WorkerAssignmentStrategy
 
   /**
    * Use the lowest possible number of tasks, while keeping each task's 
workload under
-   * {@link Limits#MAX_INPUT_FILES_PER_WORKER} files and {@link 
Limits#MAX_INPUT_BYTES_PER_WORKER} bytes.
+   * {@link Limits#MAX_INPUT_FILES_PER_WORKER} files and {@link 
StageDefinition#getMaxInputBytesPerWorker()} bytes.
    *
    * Implemented using {@link InputSpecSlicer#sliceDynamic} whenever possible.
    */
@@ -75,7 +75,7 @@ public enum WorkerAssignmentStrategy
             inputSpec,
             stageDef.getMaxWorkerCount(),
             Limits.MAX_INPUT_FILES_PER_WORKER,
-            Limits.MAX_INPUT_BYTES_PER_WORKER
+            stageDef.getMaxInputBytesPerWorker()
         );
       } else {
         // In auto mode, if we can't slice inputs dynamically, we instead 
carry forwards the number of workers from
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
index 9f69b1cd8e..b03c1111fe 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
@@ -26,6 +26,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.opencsv.RFC4180Parser;
 import com.opencsv.RFC4180ParserBuilder;
 import org.apache.druid.msq.exec.ClusterStatisticsMergeMode;
+import org.apache.druid.msq.exec.Limits;
 import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
 import org.apache.druid.msq.sql.MSQMode;
 import org.apache.druid.query.QueryContext;
@@ -63,6 +64,7 @@ public class MultiStageQueryContext
 
   public static final String CTX_FAULT_TOLERANCE = "faultTolerance";
   public static final boolean DEFAULT_FAULT_TOLERANCE = false;
+  public static final String CTX_MAX_INPUT_BYTES_PER_WORKER = 
"maxInputBytesPerWorker";
 
   public static final String CTX_CLUSTER_STATISTICS_MERGE_MODE = 
"clusterStatisticsMergeMode";
   public static final String DEFAULT_CLUSTER_STATISTICS_MERGE_MODE = 
ClusterStatisticsMergeMode.PARALLEL.toString();
@@ -115,6 +117,14 @@ public class MultiStageQueryContext
     );
   }
 
+  public static long getMaxInputBytesPerWorker(final QueryContext queryContext)
+  {
+    return queryContext.getLong(
+        CTX_MAX_INPUT_BYTES_PER_WORKER,
+        Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
+    );
+  }
+
   public static boolean isComposedIntermediateSuperSorterStorageEnabled(final 
QueryContext queryContext)
   {
     return queryContext.getBoolean(
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/StageDefinitionTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/StageDefinitionTest.java
index c4552e4afe..2257f04364 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/StageDefinitionTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/StageDefinitionTest.java
@@ -25,6 +25,7 @@ import nl.jqno.equalsverifier.EqualsVerifier;
 import org.apache.druid.frame.key.ClusterBy;
 import org.apache.druid.frame.key.SortColumn;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.msq.exec.Limits;
 import org.apache.druid.msq.input.stage.StageInputSpec;
 import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory;
 import org.apache.druid.msq.statistics.ClusterByStatisticsCollectorImpl;
@@ -55,7 +56,8 @@ public class StageDefinitionTest
         RowSignature.empty(),
         null,
         0,
-        false
+        false,
+        Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
     );
 
     Assert.assertThrows(ISE.class, () -> 
stageDefinition.generatePartitionsForShuffle(null));
@@ -72,7 +74,8 @@ public class StageDefinitionTest
         RowSignature.empty(),
         new MaxCountShuffleSpec(new ClusterBy(ImmutableList.of(new 
SortColumn("test", false)), 1), 2, false),
         1,
-        false
+        false,
+        Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
     );
 
     Assert.assertThrows(ISE.class, () -> 
stageDefinition.generatePartitionsForShuffle(null));
@@ -90,7 +93,8 @@ public class StageDefinitionTest
         RowSignature.empty(),
         new MaxCountShuffleSpec(new ClusterBy(ImmutableList.of(new 
SortColumn("test", false)), 0), 1, false),
         1,
-        false
+        false,
+        Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
     );
 
     Assert.assertThrows(
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
index bfb6981c6d..7c4b2193a2 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
@@ -100,6 +100,16 @@ public class MultiStageQueryContextTest
     );
   }
 
+  @Test
+  public void testGetMaxInputBytesPerWorker()
+  {
+    Map<String, Object> propertyMap = 
ImmutableMap.of(MultiStageQueryContext.CTX_MAX_INPUT_BYTES_PER_WORKER, 1024);
+
+    Assert.assertEquals(
+        1024,
+        
MultiStageQueryContext.getMaxInputBytesPerWorker(QueryContext.of(propertyMap)));
+  }
+
   @Test
   public void getAssignmentStrategy_parameterSetReturnsCorrectValue()
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to