gianm commented on code in PR #13955:
URL: https://github.com/apache/druid/pull/13955#discussion_r1149885471
##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java:
##########
@@ -19,14 +19,369 @@
package org.apache.druid.msq.kernel.controller;
+import com.google.common.collect.ImmutableMap;
+import it.unimi.dsi.fastutil.ints.Int2IntMaps;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongList;
import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.msq.input.InputSlice;
+import org.apache.druid.msq.input.InputSpec;
+import org.apache.druid.msq.input.InputSpecSlicer;
+import org.apache.druid.msq.input.NilInputSlice;
+import org.apache.druid.msq.input.SlicerUtils;
+import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
+import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory;
+import org.junit.Assert;
import org.junit.Test;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
public class WorkerInputsTest
{
+ private static final String QUERY_ID = "myQuery";
+
+ @Test
+ public void test_max_threeInputs_fourWorkers()
+ {
+ final StageDefinition stageDef =
+ StageDefinition.builder(0)
+ .inputs(new TestInputSpec(1, 2, 3))
+ .maxWorkerCount(4)
+ .processorFactory(new
OffsetLimitFrameProcessorFactory(0, 0L))
+ .build(QUERY_ID);
+
+ final WorkerInputs inputs = WorkerInputs.create(
+ stageDef,
+ Int2IntMaps.EMPTY_MAP,
+ new TestInputSpecSlicer(true),
+ WorkerAssignmentStrategy.MAX
+ );
+
+ Assert.assertEquals(
+ ImmutableMap.<Integer, List<InputSlice>>builder()
+ .put(0, Collections.singletonList(new TestInputSlice(1)))
+ .put(1, Collections.singletonList(new TestInputSlice(2)))
+ .put(2, Collections.singletonList(new TestInputSlice(3)))
+ .put(3, Collections.singletonList(new TestInputSlice()))
+ .build(),
+ inputs.assignmentsMap()
+ );
+ }
+
+ @Test
+ public void test_max_zeroInputs_fourWorkers()
+ {
+ final StageDefinition stageDef =
+ StageDefinition.builder(0)
+ .inputs(new TestInputSpec())
+ .maxWorkerCount(4)
+ .processorFactory(new
OffsetLimitFrameProcessorFactory(0, 0L))
+ .build(QUERY_ID);
+
+ final WorkerInputs inputs = WorkerInputs.create(
+ stageDef,
+ Int2IntMaps.EMPTY_MAP,
+ new TestInputSpecSlicer(true),
+ WorkerAssignmentStrategy.MAX
+ );
+
+ Assert.assertEquals(
+ ImmutableMap.<Integer, List<InputSlice>>builder()
+ .put(0, Collections.singletonList(new TestInputSlice()))
+ .put(1, Collections.singletonList(new TestInputSlice()))
+ .put(2, Collections.singletonList(new TestInputSlice()))
+ .put(3, Collections.singletonList(new TestInputSlice()))
+ .build(),
+ inputs.assignmentsMap()
+ );
+ }
+
+ @Test
+ public void test_auto_zeroInputSpecs_fourWorkers()
+ {
+ final StageDefinition stageDef =
+ StageDefinition.builder(0)
+ .inputs()
+ .maxWorkerCount(4)
+ .processorFactory(new
OffsetLimitFrameProcessorFactory(0, 0L))
+ .build(QUERY_ID);
+
+ final WorkerInputs inputs = WorkerInputs.create(
+ stageDef,
+ Int2IntMaps.EMPTY_MAP,
+ new TestInputSpecSlicer(true),
+ WorkerAssignmentStrategy.AUTO
+ );
+
+ Assert.assertEquals(
+ ImmutableMap.<Integer, List<InputSlice>>builder()
+ .put(0, Collections.singletonList(NilInputSlice.INSTANCE))
+ .build(),
+ inputs.assignmentsMap()
+ );
+ }
+
+ @Test
+ public void test_auto_zeroInputSlices_fourWorkers()
+ {
+ final StageDefinition stageDef =
+ StageDefinition.builder(0)
+ .inputs(new TestInputSpec())
+ .maxWorkerCount(4)
+ .processorFactory(new
OffsetLimitFrameProcessorFactory(0, 0L))
+ .build(QUERY_ID);
+
+ final WorkerInputs inputs = WorkerInputs.create(
+ stageDef,
+ Int2IntMaps.EMPTY_MAP,
+ new TestInputSpecSlicer(true),
+ WorkerAssignmentStrategy.AUTO
+ );
+
+ Assert.assertEquals(
+ ImmutableMap.<Integer, List<InputSlice>>builder()
+ .put(0, Collections.singletonList(NilInputSlice.INSTANCE))
+ .build(),
+ inputs.assignmentsMap()
+ );
+ }
+
+ @Test
+ public void test_auto_zeroInputSlices_broadcast_fourWorkers()
+ {
+ final StageDefinition stageDef =
+ StageDefinition.builder(0)
+ .inputs(new TestInputSpec())
+ .broadcastInputs(IntSet.of(0))
+ .maxWorkerCount(4)
+ .processorFactory(new
OffsetLimitFrameProcessorFactory(0, 0L))
+ .build(QUERY_ID);
+
+ final WorkerInputs inputs = WorkerInputs.create(
+ stageDef,
+ Int2IntMaps.EMPTY_MAP,
+ new TestInputSpecSlicer(true),
+ WorkerAssignmentStrategy.AUTO
+ );
+
+ Assert.assertEquals(
+ ImmutableMap.<Integer, List<InputSlice>>builder()
+ .put(0, Collections.singletonList(new TestInputSlice()))
+ .build(),
+ inputs.assignmentsMap()
+ );
+ }
+
+ @Test
+ public void test_auto_threeInputs_fourWorkers()
+ {
+ final StageDefinition stageDef =
+ StageDefinition.builder(0)
+ .inputs(new TestInputSpec(1, 2, 3))
+ .maxWorkerCount(4)
+ .processorFactory(new
OffsetLimitFrameProcessorFactory(0, 0L))
+ .build(QUERY_ID);
+
+ final WorkerInputs inputs = WorkerInputs.create(
+ stageDef,
+ Int2IntMaps.EMPTY_MAP,
+ new TestInputSpecSlicer(true),
+ WorkerAssignmentStrategy.AUTO
+ );
+
+ Assert.assertEquals(
+ ImmutableMap.<Integer, List<InputSlice>>builder()
+ .put(0, Collections.singletonList(new TestInputSlice(1, 2,
3)))
+ .build(),
+ inputs.assignmentsMap()
+ );
+ }
+
+ @Test
+ public void test_auto_threeBigInputs_fourWorkers()
+ {
+ final StageDefinition stageDef =
+ StageDefinition.builder(0)
+ .inputs(new TestInputSpec(4_000_000_000L,
4_000_000_001L, 4_000_000_002L))
+ .maxWorkerCount(4)
+ .processorFactory(new
OffsetLimitFrameProcessorFactory(0, 0L))
+ .build(QUERY_ID);
+
+ final WorkerInputs inputs = WorkerInputs.create(
+ stageDef,
+ Int2IntMaps.EMPTY_MAP,
+ new TestInputSpecSlicer(true),
+ WorkerAssignmentStrategy.AUTO
+ );
+
+ Assert.assertEquals(
+ ImmutableMap.<Integer, List<InputSlice>>builder()
+ .put(0, Collections.singletonList(new
TestInputSlice(4_000_000_000L, 4_000_000_001L)))
+ .put(1, Collections.singletonList(new
TestInputSlice(4_000_000_002L)))
+ .build(),
+ inputs.assignmentsMap()
+ );
+ }
+
+ @Test
+ public void test_auto_threeBigInputs_oneWorker()
Review Comment:
What do you mean by "more splits than the limit" and "more capacity than
available"?
I'll add a new test for many small files & one big one, it will be called
`test_auto_tenSmallAndOneBigInputs_twoWorkers`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]