This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 76499577102 MSQ: Fix issue where AUTO assignment would not respect
maxWorkerCount. (#16214)
76499577102 is described below
commit 7649957710219e186295ff6d71ddf724fc1ab76a
Author: Gian Merlino <[email protected]>
AuthorDate: Thu Mar 28 02:10:31 2024 -0700
MSQ: Fix issue where AUTO assignment would not respect maxWorkerCount.
(#16214)
WorkerAssignmentStrategy.AUTO was missing a check for maxWorkerCount
in the case where the inputs to a stage are not dynamically sliceable.
A common case here is when the inputs to a stage are other stages.
---
.../msq/input/stage/StripedReadablePartitions.java | 6 +-
.../druid/msq/kernel/WorkerAssignmentStrategy.java | 2 +-
.../msq/kernel/controller/WorkerInputsTest.java | 89 ++++++++++++++++++++++
3 files changed, 95 insertions(+), 2 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/stage/StripedReadablePartitions.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/stage/StripedReadablePartitions.java
index bf9fdaa152e..887970efdd3 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/stage/StripedReadablePartitions.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/stage/StripedReadablePartitions.java
@@ -41,7 +41,11 @@ public class StripedReadablePartitions implements
ReadablePartitions
private final int numWorkers;
private final IntSortedSet partitionNumbers;
- StripedReadablePartitions(final int stageNumber, final int numWorkers, final
IntSortedSet partitionNumbers)
+ /**
+ * Constructor. Most callers should use {@link
ReadablePartitions#striped(int, int, int)} instead, which takes
+ * a partition count rather than a set of partition numbers.
+ */
+ public StripedReadablePartitions(final int stageNumber, final int
numWorkers, final IntSortedSet partitionNumbers)
{
this.stageNumber = stageNumber;
this.numWorkers = numWorkers;
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java
index 8d1832be17f..18f1f821d9a 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java
@@ -90,7 +90,7 @@ public enum WorkerAssignmentStrategy
final IntSet inputStages = stageDef.getInputStageNumbers();
final OptionalInt maxInputStageWorkerCount =
inputStages.intStream().map(stageWorkerCountMap).max();
- final int workerCount = maxInputStageWorkerCount.orElse(1);
+ final int workerCount = Math.min(stageDef.getMaxWorkerCount(),
maxInputStageWorkerCount.orElse(1));
return slicer.sliceStatic(inputSpec, workerCount);
}
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java
index bfb4c8ca675..00ccfdee6c1 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java
@@ -20,7 +20,10 @@
package org.apache.druid.msq.kernel.controller;
import com.google.common.collect.ImmutableMap;
+import it.unimi.dsi.fastutil.ints.Int2IntAVLTreeMap;
import it.unimi.dsi.fastutil.ints.Int2IntMaps;
+import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap;
+import it.unimi.dsi.fastutil.ints.IntAVLTreeSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.longs.LongList;
@@ -31,6 +34,11 @@ import org.apache.druid.msq.input.InputSpec;
import org.apache.druid.msq.input.InputSpecSlicer;
import org.apache.druid.msq.input.NilInputSlice;
import org.apache.druid.msq.input.SlicerUtils;
+import org.apache.druid.msq.input.stage.ReadablePartitions;
+import org.apache.druid.msq.input.stage.StageInputSlice;
+import org.apache.druid.msq.input.stage.StageInputSpec;
+import org.apache.druid.msq.input.stage.StageInputSpecSlicer;
+import org.apache.druid.msq.input.stage.StripedReadablePartitions;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory;
@@ -216,6 +224,87 @@ public class WorkerInputsTest
);
}
+ @Test
+ public void
test_auto_oneInputStageWithThreePartitionsAndTwoWorkers_fourWorkerMax()
+ {
+ final StageDefinition stageDef =
+ StageDefinition.builder(1)
+ .inputs(new StageInputSpec(0))
+ .maxWorkerCount(4)
+ .processorFactory(new
OffsetLimitFrameProcessorFactory(0, 0L))
+ .build(QUERY_ID);
+
+ final WorkerInputs inputs = WorkerInputs.create(
+ stageDef,
+ new Int2IntAVLTreeMap(ImmutableMap.of(0, 2)),
+ new StageInputSpecSlicer(
+ new Int2ObjectAVLTreeMap<>(ImmutableMap.of(0,
ReadablePartitions.striped(0, 2, 3)))
+ ),
+ WorkerAssignmentStrategy.AUTO,
+ Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
+ );
+
+ Assert.assertEquals(
+ ImmutableMap.<Integer, List<InputSlice>>builder()
+ .put(
+ 0,
+ Collections.singletonList(
+ new StageInputSlice(
+ 0,
+ new StripedReadablePartitions(0, 2, new
IntAVLTreeSet(new int[]{0, 2}))
+ )
+ )
+ )
+ .put(
+ 1,
+ Collections.singletonList(
+ new StageInputSlice(
+ 0,
+ new StripedReadablePartitions(0, 2, new
IntAVLTreeSet(new int[]{1}))
+ )
+ )
+ )
+ .build(),
+ inputs.assignmentsMap()
+ );
+ }
+
+ @Test
+ public void
test_auto_oneInputStageWithThreePartitionsAndTwoWorkers_oneWorkerMax()
+ {
+ final StageDefinition stageDef =
+ StageDefinition.builder(1)
+ .inputs(new StageInputSpec(0))
+ .maxWorkerCount(1)
+ .processorFactory(new
OffsetLimitFrameProcessorFactory(0, 0L))
+ .build(QUERY_ID);
+
+ final WorkerInputs inputs = WorkerInputs.create(
+ stageDef,
+ new Int2IntAVLTreeMap(ImmutableMap.of(0, 2)),
+ new StageInputSpecSlicer(
+ new Int2ObjectAVLTreeMap<>(ImmutableMap.of(0,
ReadablePartitions.striped(0, 2, 3)))
+ ),
+ WorkerAssignmentStrategy.AUTO,
+ Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
+ );
+
+ Assert.assertEquals(
+ ImmutableMap.<Integer, List<InputSlice>>builder()
+ .put(
+ 0,
+ Collections.singletonList(
+ new StageInputSlice(
+ 0,
+ new StripedReadablePartitions(0, 2, new
IntAVLTreeSet(new int[]{0, 1, 2}))
+ )
+ )
+ )
+ .build(),
+ inputs.assignmentsMap()
+ );
+ }
+
@Test
public void test_auto_threeBigInputs_fourWorkers()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]