This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 51dfde0284 Add maxInputBytesPerWorker as query context parameter
(#13707)
51dfde0284 is described below
commit 51dfde02840017092486fb75be2b16566aff6a19
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Tue Jan 31 20:55:28 2023 +0530
Add maxInputBytesPerWorker as query context parameter (#13707)
* Add maxInputBytesPerWorker as query context parameter
* Move documenation to msq specific docs
* Update tests
* Spacing
* Address review comments
* Fix test
* Update docs/multi-stage-query/reference.md
* Correct spelling mistake
---------
Co-authored-by: Karan Kumar <[email protected]>
---
docs/multi-stage-query/reference.md | 1 +
.../org/apache/druid/msq/exec/ControllerImpl.java | 4 ++++
.../main/java/org/apache/druid/msq/exec/Limits.java | 2 +-
.../org/apache/druid/msq/kernel/StageDefinition.java | 20 +++++++++++++++++---
.../druid/msq/kernel/StageDefinitionBuilder.java | 11 ++++++++++-
.../druid/msq/kernel/WorkerAssignmentStrategy.java | 4 ++--
.../druid/msq/util/MultiStageQueryContext.java | 10 ++++++++++
.../apache/druid/msq/kernel/StageDefinitionTest.java | 10 +++++++---
.../druid/msq/util/MultiStageQueryContextTest.java | 10 ++++++++++
9 files changed, 62 insertions(+), 10 deletions(-)
diff --git a/docs/multi-stage-query/reference.md
b/docs/multi-stage-query/reference.md
index 2791e5caea..8de7507f52 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -602,6 +602,7 @@ The following table lists the context parameters for the
MSQ task engine:
| `faultTolerance` | SELECT, INSERT, REPLACE<br /><br /> Whether to turn on
fault tolerance mode or not. Failed workers are retried based on
[Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly
set to false. | `false` |
| `composedIntermediateSuperSorterStorageEnabled` | SELECT, INSERT, REPLACE<br
/><br /> Whether to enable automatic fallback to durable storage from local
storage for sorting's intermediate data. Requires to setup
`intermediateSuperSorterStorageMaxLocalBytes` limit for local storage and
durable shuffle storage feature as well.| `false` |
| `intermediateSuperSorterStorageMaxLocalBytes` | SELECT, INSERT, REPLACE<br
/><br /> Whether to enable a byte limit on local storage for sorting's
intermediate data. If that limit is crossed, the task fails with
`ResourceLimitExceededException`.| `9223372036854775807` |
+| `maxInputBytesPerWorker` | Should be used in conjunction with taskAssignment
`auto` mode. When dividing the input of a stage among the workers, this
parameter determines the maximum size in bytes that are given to a single
worker before the next worker is chosen. This parameter is only used as a
guideline during input slicing, and does not guarantee that a the input cannot
be larger. For example, we have 3 files. 3, 7, 12 GB each. then we would end up
using 2 worker: worker 1 -> 3, 7 a [...]
## Sketch Merging Mode
This section details the advantages and performance of various Cluster By
Statistics Merge Modes.
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index d3257a72fb..e190edae9e 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -1599,10 +1599,14 @@ public class ControllerImpl implements Controller
final DataSchema dataSchema =
generateDataSchema(querySpec, querySignature, queryClusterBy,
columnMappings, jsonMapper);
+ final long maxInputBytesPerWorker =
+
MultiStageQueryContext.getMaxInputBytesPerWorker(querySpec.getQuery().context());
+
builder.add(
StageDefinition.builder(queryDef.getNextStageNumber())
.inputs(new
StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber()))
.maxWorkerCount(tuningConfig.getMaxNumWorkers())
+ .maxInputBytesPerWorker(maxInputBytesPerWorker)
.processorFactory(
new SegmentGeneratorFrameProcessorFactory(
dataSchema,
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java
index 31a309afb1..8bce9a7ba5 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java
@@ -62,7 +62,7 @@ public class Limits
/**
* Maximum number of input bytes per worker in case number of tasks is
determined automatically.
*/
- public static final long MAX_INPUT_BYTES_PER_WORKER = 10 * 1024 * 1024 *
1024L;
+ public static final long DEFAULT_MAX_INPUT_BYTES_PER_WORKER = 10 * 1024 *
1024 * 1024L;
/**
* Maximum size of the kernel manipulation queue in {@link
org.apache.druid.msq.indexing.MSQControllerTask}.
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
index 5fe3518af2..083bc167df 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
@@ -33,6 +33,7 @@ import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.java.util.common.Either;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.input.InputSpec;
import org.apache.druid.msq.input.InputSpecs;
import org.apache.druid.msq.statistics.ClusterByStatisticsCollector;
@@ -85,6 +86,7 @@ public class StageDefinition
private final FrameProcessorFactory processorFactory;
private final RowSignature signature;
private final int maxWorkerCount;
+ private final long maxInputBytesPerWorker;
private final boolean shuffleCheckHasMultipleValues;
@Nullable
@@ -102,7 +104,8 @@ public class StageDefinition
@JsonProperty("signature") final RowSignature signature,
@Nullable @JsonProperty("shuffleSpec") final ShuffleSpec shuffleSpec,
@JsonProperty("maxWorkerCount") final int maxWorkerCount,
- @JsonProperty("shuffleCheckHasMultipleValues") final boolean
shuffleCheckHasMultipleValues
+ @JsonProperty("shuffleCheckHasMultipleValues") final boolean
shuffleCheckHasMultipleValues,
+ @JsonProperty("maxInputBytesPerWorker") final Long maxInputBytesPerWorker
)
{
this.id = Preconditions.checkNotNull(id, "id");
@@ -122,6 +125,8 @@ public class StageDefinition
this.maxWorkerCount = maxWorkerCount;
this.shuffleCheckHasMultipleValues = shuffleCheckHasMultipleValues;
this.frameReader = Suppliers.memoize(() ->
FrameReader.create(signature))::get;
+ this.maxInputBytesPerWorker = maxInputBytesPerWorker == null ?
+ Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER :
maxInputBytesPerWorker;
if (shuffleSpec != null && shuffleSpec.needsStatistics() &&
shuffleSpec.getClusterBy().getColumns().isEmpty()) {
throw new IAE("Cannot shuffle with spec [%s] and nil clusterBy",
shuffleSpec);
@@ -241,6 +246,12 @@ public class StageDefinition
return maxWorkerCount;
}
+ @JsonProperty
+ public long getMaxInputBytesPerWorker()
+ {
+ return maxInputBytesPerWorker;
+ }
+
@JsonProperty("shuffleCheckHasMultipleValues")
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
boolean getShuffleCheckHasMultipleValues()
@@ -330,7 +341,8 @@ public class StageDefinition
&& Objects.equals(broadcastInputNumbers, that.broadcastInputNumbers)
&& Objects.equals(processorFactory, that.processorFactory)
&& Objects.equals(signature, that.signature)
- && Objects.equals(shuffleSpec, that.shuffleSpec);
+ && Objects.equals(shuffleSpec, that.shuffleSpec)
+ && Objects.equals(maxInputBytesPerWorker,
that.maxInputBytesPerWorker);
}
@Override
@@ -344,7 +356,8 @@ public class StageDefinition
signature,
maxWorkerCount,
shuffleCheckHasMultipleValues,
- shuffleSpec
+ shuffleSpec,
+ maxInputBytesPerWorker
);
}
@@ -360,6 +373,7 @@ public class StageDefinition
", maxWorkerCount=" + maxWorkerCount +
", shuffleSpec=" + shuffleSpec +
(shuffleCheckHasMultipleValues ? ", shuffleCheckHasMultipleValues="
+ shuffleCheckHasMultipleValues : "") +
+ ", maxInputBytesPerWorker=" + maxInputBytesPerWorker +
'}';
}
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinitionBuilder.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinitionBuilder.java
index e1364eeb16..384b78fd33 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinitionBuilder.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinitionBuilder.java
@@ -21,6 +21,7 @@ package org.apache.druid.msq.kernel;
import it.unimi.dsi.fastutil.ints.IntRBTreeSet;
import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.input.InputSpec;
import org.apache.druid.segment.column.RowSignature;
@@ -42,6 +43,7 @@ public class StageDefinitionBuilder
private int maxWorkerCount = 1;
private ShuffleSpec shuffleSpec = null;
private boolean shuffleCheckHasMultipleValues = false;
+ private long maxInputBytesPerWorker =
Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER;
/**
* Package-private: callers should prefer {@link
StageDefinition#builder(int)} rather than this constructor.
@@ -105,6 +107,12 @@ public class StageDefinitionBuilder
return this;
}
+ public StageDefinitionBuilder maxInputBytesPerWorker(final long
maxInputBytesPerWorker)
+ {
+ this.maxInputBytesPerWorker = maxInputBytesPerWorker;
+ return this;
+ }
+
int getStageNumber()
{
return stageNumber;
@@ -120,7 +128,8 @@ public class StageDefinitionBuilder
signature,
shuffleSpec,
maxWorkerCount,
- shuffleCheckHasMultipleValues
+ shuffleCheckHasMultipleValues,
+ maxInputBytesPerWorker
);
}
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java
index 0e947c77a5..c813778f45 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java
@@ -57,7 +57,7 @@ public enum WorkerAssignmentStrategy
/**
* Use the lowest possible number of tasks, while keeping each task's
workload under
- * {@link Limits#MAX_INPUT_FILES_PER_WORKER} files and {@link
Limits#MAX_INPUT_BYTES_PER_WORKER} bytes.
+ * {@link Limits#MAX_INPUT_FILES_PER_WORKER} files and {@link
StageDefinition#getMaxInputBytesPerWorker()} bytes.
*
* Implemented using {@link InputSpecSlicer#sliceDynamic} whenever possible.
*/
@@ -75,7 +75,7 @@ public enum WorkerAssignmentStrategy
inputSpec,
stageDef.getMaxWorkerCount(),
Limits.MAX_INPUT_FILES_PER_WORKER,
- Limits.MAX_INPUT_BYTES_PER_WORKER
+ stageDef.getMaxInputBytesPerWorker()
);
} else {
// In auto mode, if we can't slice inputs dynamically, we instead
carry forwards the number of workers from
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
index 9f69b1cd8e..b03c1111fe 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
@@ -26,6 +26,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.opencsv.RFC4180Parser;
import com.opencsv.RFC4180ParserBuilder;
import org.apache.druid.msq.exec.ClusterStatisticsMergeMode;
+import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
import org.apache.druid.msq.sql.MSQMode;
import org.apache.druid.query.QueryContext;
@@ -63,6 +64,7 @@ public class MultiStageQueryContext
public static final String CTX_FAULT_TOLERANCE = "faultTolerance";
public static final boolean DEFAULT_FAULT_TOLERANCE = false;
+ public static final String CTX_MAX_INPUT_BYTES_PER_WORKER =
"maxInputBytesPerWorker";
public static final String CTX_CLUSTER_STATISTICS_MERGE_MODE =
"clusterStatisticsMergeMode";
public static final String DEFAULT_CLUSTER_STATISTICS_MERGE_MODE =
ClusterStatisticsMergeMode.PARALLEL.toString();
@@ -115,6 +117,14 @@ public class MultiStageQueryContext
);
}
+ public static long getMaxInputBytesPerWorker(final QueryContext queryContext)
+ {
+ return queryContext.getLong(
+ CTX_MAX_INPUT_BYTES_PER_WORKER,
+ Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
+ );
+ }
+
public static boolean isComposedIntermediateSuperSorterStorageEnabled(final
QueryContext queryContext)
{
return queryContext.getBoolean(
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/StageDefinitionTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/StageDefinitionTest.java
index c4552e4afe..2257f04364 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/StageDefinitionTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/StageDefinitionTest.java
@@ -25,6 +25,7 @@ import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.frame.key.SortColumn;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.input.stage.StageInputSpec;
import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory;
import org.apache.druid.msq.statistics.ClusterByStatisticsCollectorImpl;
@@ -55,7 +56,8 @@ public class StageDefinitionTest
RowSignature.empty(),
null,
0,
- false
+ false,
+ Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
);
Assert.assertThrows(ISE.class, () ->
stageDefinition.generatePartitionsForShuffle(null));
@@ -72,7 +74,8 @@ public class StageDefinitionTest
RowSignature.empty(),
new MaxCountShuffleSpec(new ClusterBy(ImmutableList.of(new
SortColumn("test", false)), 1), 2, false),
1,
- false
+ false,
+ Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
);
Assert.assertThrows(ISE.class, () ->
stageDefinition.generatePartitionsForShuffle(null));
@@ -90,7 +93,8 @@ public class StageDefinitionTest
RowSignature.empty(),
new MaxCountShuffleSpec(new ClusterBy(ImmutableList.of(new
SortColumn("test", false)), 0), 1, false),
1,
- false
+ false,
+ Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
);
Assert.assertThrows(
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
index bfb6981c6d..7c4b2193a2 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
@@ -100,6 +100,16 @@ public class MultiStageQueryContextTest
);
}
+ @Test
+ public void testGetMaxInputBytesPerWorker()
+ {
+ Map<String, Object> propertyMap =
ImmutableMap.of(MultiStageQueryContext.CTX_MAX_INPUT_BYTES_PER_WORKER, 1024);
+
+ Assert.assertEquals(
+ 1024,
+
MultiStageQueryContext.getMaxInputBytesPerWorker(QueryContext.of(propertyMap)));
+ }
+
@Test
public void getAssignmentStrategy_parameterSetReturnsCorrectValue()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]