This is an automated email from the ASF dual-hosted git repository.
lakshsingla pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 10bce22e68 Configure maxBytesPerWorker directly instead of using
StageDefinition (#14257)
10bce22e68 is described below
commit 10bce22e680f590e8c07fff980fcbe6c7fb0ea33
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Mon May 15 16:51:57 2023 +0530
Configure maxBytesPerWorker directly instead of using StageDefinition
(#14257)
* Configure maxBytesPerWorker directly instead of using StageDefinition
---
.../org/apache/druid/msq/exec/ControllerImpl.java | 10 +-
.../apache/druid/msq/kernel/StageDefinition.java | 20 +--
.../druid/msq/kernel/StageDefinitionBuilder.java | 11 +-
.../druid/msq/kernel/WorkerAssignmentStrategy.java | 27 ++--
.../kernel/controller/ControllerQueryKernel.java | 11 +-
.../kernel/controller/ControllerStageTracker.java | 5 +-
.../druid/msq/kernel/controller/WorkerInputs.java | 11 +-
.../org/apache/druid/msq/exec/MSQInsertTest.java | 109 +++++++++++++-
.../druid/msq/kernel/StageDefinitionTest.java | 10 +-
.../controller/BaseControllerQueryKernelTest.java | 4 +-
.../msq/kernel/controller/WorkerInputsTest.java | 156 +++++++++++++++++++--
.../org/apache/druid/msq/test/MSQTestBase.java | 18 +++
.../druid/msq/test/MSQTestControllerContext.java | 4 +-
.../multipleFiles/wikipedia-sampled-1.json | 7 +
.../multipleFiles/wikipedia-sampled-2.json | 7 +
.../multipleFiles/wikipedia-sampled-3.json | 6 +
16 files changed, 345 insertions(+), 71 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index a7aeffe0b1..b39249d811 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -1599,14 +1599,10 @@ public class ControllerImpl implements Controller
final DataSchema dataSchema =
generateDataSchema(querySpec, querySignature, queryClusterBy,
columnMappings, jsonMapper);
- final long maxInputBytesPerWorker =
-
MultiStageQueryContext.getMaxInputBytesPerWorker(querySpec.getQuery().context());
-
builder.add(
StageDefinition.builder(queryDef.getNextStageNumber())
.inputs(new
StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber()))
.maxWorkerCount(tuningConfig.getMaxNumWorkers())
- .maxInputBytesPerWorker(maxInputBytesPerWorker)
.processorFactory(
new SegmentGeneratorFrameProcessorFactory(
dataSchema,
@@ -2406,10 +2402,14 @@ public class ControllerImpl implements Controller
*/
private void startStages() throws IOException, InterruptedException
{
+ final long maxInputBytesPerWorker =
+
MultiStageQueryContext.getMaxInputBytesPerWorker(task.getQuerySpec().getQuery().context());
+
logKernelStatus(queryDef.getQueryId(), queryKernel);
final List<StageId> newStageIds = queryKernel.createAndGetNewStageIds(
inputSpecSlicerFactory,
- task.getQuerySpec().getAssignmentStrategy()
+ task.getQuerySpec().getAssignmentStrategy(),
+ maxInputBytesPerWorker
);
for (final StageId stageId : newStageIds) {
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
index 9892eae824..d333edb7d2 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
@@ -40,7 +40,6 @@ import org.apache.druid.frame.write.FrameWriters;
import org.apache.druid.java.util.common.Either;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.input.InputSpec;
import org.apache.druid.msq.input.InputSpecs;
import org.apache.druid.msq.statistics.ClusterByStatisticsCollector;
@@ -93,7 +92,6 @@ public class StageDefinition
private final FrameProcessorFactory processorFactory;
private final RowSignature signature;
private final int maxWorkerCount;
- private final long maxInputBytesPerWorker;
private final boolean shuffleCheckHasMultipleValues;
@Nullable
@@ -111,8 +109,7 @@ public class StageDefinition
@JsonProperty("signature") final RowSignature signature,
@Nullable @JsonProperty("shuffleSpec") final ShuffleSpec shuffleSpec,
@JsonProperty("maxWorkerCount") final int maxWorkerCount,
- @JsonProperty("shuffleCheckHasMultipleValues") final boolean
shuffleCheckHasMultipleValues,
- @JsonProperty("maxInputBytesPerWorker") final Long maxInputBytesPerWorker
+ @JsonProperty("shuffleCheckHasMultipleValues") final boolean
shuffleCheckHasMultipleValues
)
{
this.id = Preconditions.checkNotNull(id, "id");
@@ -132,8 +129,6 @@ public class StageDefinition
this.maxWorkerCount = maxWorkerCount;
this.shuffleCheckHasMultipleValues = shuffleCheckHasMultipleValues;
this.frameReader = Suppliers.memoize(() ->
FrameReader.create(signature))::get;
- this.maxInputBytesPerWorker = maxInputBytesPerWorker == null ?
- Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER :
maxInputBytesPerWorker;
if (mustGatherResultKeyStatistics() &&
shuffleSpec.clusterBy().getColumns().isEmpty()) {
throw new IAE("Cannot shuffle with spec [%s] and nil clusterBy",
shuffleSpec);
@@ -280,12 +275,6 @@ public class StageDefinition
return maxWorkerCount;
}
- @JsonProperty
- public long getMaxInputBytesPerWorker()
- {
- return maxInputBytesPerWorker;
- }
-
@JsonProperty("shuffleCheckHasMultipleValues")
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
boolean getShuffleCheckHasMultipleValues()
@@ -412,8 +401,7 @@ public class StageDefinition
&& Objects.equals(broadcastInputNumbers, that.broadcastInputNumbers)
&& Objects.equals(processorFactory, that.processorFactory)
&& Objects.equals(signature, that.signature)
- && Objects.equals(shuffleSpec, that.shuffleSpec)
- && Objects.equals(maxInputBytesPerWorker,
that.maxInputBytesPerWorker);
+ && Objects.equals(shuffleSpec, that.shuffleSpec);
}
@Override
@@ -427,8 +415,7 @@ public class StageDefinition
signature,
maxWorkerCount,
shuffleCheckHasMultipleValues,
- shuffleSpec,
- maxInputBytesPerWorker
+ shuffleSpec
);
}
@@ -444,7 +431,6 @@ public class StageDefinition
", maxWorkerCount=" + maxWorkerCount +
", shuffleSpec=" + shuffleSpec +
(shuffleCheckHasMultipleValues ? ", shuffleCheckHasMultipleValues="
+ shuffleCheckHasMultipleValues : "") +
- ", maxInputBytesPerWorker=" + maxInputBytesPerWorker +
'}';
}
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinitionBuilder.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinitionBuilder.java
index 3ff9594123..ebf5cfd709 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinitionBuilder.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinitionBuilder.java
@@ -21,7 +21,6 @@ package org.apache.druid.msq.kernel;
import it.unimi.dsi.fastutil.ints.IntRBTreeSet;
import it.unimi.dsi.fastutil.ints.IntSet;
-import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.input.InputSpec;
import org.apache.druid.segment.column.RowSignature;
@@ -43,7 +42,6 @@ public class StageDefinitionBuilder
private int maxWorkerCount = 1;
private ShuffleSpec shuffleSpec = null;
private boolean shuffleCheckHasMultipleValues = false;
- private long maxInputBytesPerWorker =
Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER;
/**
* Package-private: callers should prefer {@link
StageDefinition#builder(int)} rather than this constructor.
@@ -107,12 +105,6 @@ public class StageDefinitionBuilder
return this;
}
- public StageDefinitionBuilder maxInputBytesPerWorker(final long
maxInputBytesPerWorker)
- {
- this.maxInputBytesPerWorker = maxInputBytesPerWorker;
- return this;
- }
-
int getStageNumber()
{
return stageNumber;
@@ -133,8 +125,7 @@ public class StageDefinitionBuilder
signature,
shuffleSpec,
maxWorkerCount,
- shuffleCheckHasMultipleValues,
- maxInputBytesPerWorker
+ shuffleCheckHasMultipleValues
);
}
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java
index c813778f45..8d1832be17 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java
@@ -45,10 +45,11 @@ public enum WorkerAssignmentStrategy
MAX {
@Override
public List<InputSlice> assign(
- StageDefinition stageDef,
- InputSpec inputSpec,
- Int2IntMap stageWorkerCountMap,
- InputSpecSlicer slicer
+ final StageDefinition stageDef,
+ final InputSpec inputSpec,
+ final Int2IntMap stageWorkerCountMap,
+ final InputSpecSlicer slicer,
+ final long maxInputBytesPerSlice
)
{
return slicer.sliceStatic(inputSpec, stageDef.getMaxWorkerCount());
@@ -57,7 +58,7 @@ public enum WorkerAssignmentStrategy
/**
* Use the lowest possible number of tasks, while keeping each task's
workload under
- * {@link Limits#MAX_INPUT_FILES_PER_WORKER} files and {@link
StageDefinition#getMaxInputBytesPerWorker()} bytes.
+ * {@link Limits#MAX_INPUT_FILES_PER_WORKER} files and {@code
maxInputBytesPerWorker} bytes.
*
* Implemented using {@link InputSpecSlicer#sliceDynamic} whenever possible.
*/
@@ -67,7 +68,8 @@ public enum WorkerAssignmentStrategy
final StageDefinition stageDef,
final InputSpec inputSpec,
final Int2IntMap stageWorkerCountMap,
- final InputSpecSlicer slicer
+ final InputSpecSlicer slicer,
+ final long maxInputBytesPerSlice
)
{
if (slicer.canSliceDynamic(inputSpec)) {
@@ -75,7 +77,7 @@ public enum WorkerAssignmentStrategy
inputSpec,
stageDef.getMaxWorkerCount(),
Limits.MAX_INPUT_FILES_PER_WORKER,
- stageDef.getMaxInputBytesPerWorker()
+ maxInputBytesPerSlice
);
} else {
// In auto mode, if we can't slice inputs dynamically, we instead
carry forwards the number of workers from
@@ -110,10 +112,19 @@ public enum WorkerAssignmentStrategy
return StringUtils.toLowerCase(name());
}
+ /**
+ * @param stageDef current stage definition. Contains information on max
workers, input stage numbers
+ * @param inputSpec inputSpec containing information on where the input is
read from
+ * @param stageWorkerCountMap map of past stage number vs number of worker
inputs
+ * @param slicer creates slices of input spec based on other parameters
+ * @param maxInputBytesPerSlice maximum suggested bytes per input slice
+ * @return list containing input slices
+ */
public abstract List<InputSlice> assign(
StageDefinition stageDef,
InputSpec inputSpec,
Int2IntMap stageWorkerCountMap,
- InputSpecSlicer slicer
+ InputSpecSlicer slicer,
+ long maxInputBytesPerSlice
);
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
index ddd7b633b3..62a45c4eaa 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
@@ -162,7 +162,8 @@ public class ControllerQueryKernel
*/
public List<StageId> createAndGetNewStageIds(
final InputSpecSlicerFactory slicerFactory,
- final WorkerAssignmentStrategy assignmentStrategy
+ final WorkerAssignmentStrategy assignmentStrategy,
+ final long maxInputBytesPerWorker
)
{
final Int2IntMap stageWorkerCountMap = new Int2IntAVLTreeMap();
@@ -177,7 +178,7 @@ public class ControllerQueryKernel
}
}
- createNewKernels(stageWorkerCountMap,
slicerFactory.makeSlicer(stagePartitionsMap), assignmentStrategy);
+ createNewKernels(stageWorkerCountMap,
slicerFactory.makeSlicer(stagePartitionsMap), assignmentStrategy,
maxInputBytesPerWorker);
return stageTracker.values()
.stream()
.filter(controllerStageTracker ->
controllerStageTracker.getPhase() == ControllerStagePhase.NEW)
@@ -292,7 +293,8 @@ public class ControllerQueryKernel
private void createNewKernels(
final Int2IntMap stageWorkerCountMap,
final InputSpecSlicer slicer,
- final WorkerAssignmentStrategy assignmentStrategy
+ final WorkerAssignmentStrategy assignmentStrategy,
+ final long maxInputBytesPerWorker
)
{
for (final StageId nextStage : readyToRunStages) {
@@ -303,7 +305,8 @@ public class ControllerQueryKernel
stageWorkerCountMap,
slicer,
assignmentStrategy,
- maxRetainedPartitionSketchBytes
+ maxRetainedPartitionSketchBytes,
+ maxInputBytesPerWorker
);
stageTracker.put(nextStage, stageKernel);
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
index 6619f389f6..e0190bfacb 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
@@ -167,10 +167,11 @@ class ControllerStageTracker
final Int2IntMap stageWorkerCountMap,
final InputSpecSlicer slicer,
final WorkerAssignmentStrategy assignmentStrategy,
- final int maxRetainedPartitionSketchBytes
+ final int maxRetainedPartitionSketchBytes,
+ final long maxInputBytesPerWorker
)
{
- final WorkerInputs workerInputs = WorkerInputs.create(stageDef,
stageWorkerCountMap, slicer, assignmentStrategy);
+ final WorkerInputs workerInputs = WorkerInputs.create(stageDef,
stageWorkerCountMap, slicer, assignmentStrategy, maxInputBytesPerWorker);
return new ControllerStageTracker(
stageDef,
workerInputs,
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java
index da27ceba65..83d7a602bc 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java
@@ -59,7 +59,8 @@ public class WorkerInputs
final StageDefinition stageDef,
final Int2IntMap stageWorkerCountMap,
final InputSpecSlicer slicer,
- final WorkerAssignmentStrategy assignmentStrategy
+ final WorkerAssignmentStrategy assignmentStrategy,
+ final long maxInputBytesPerWorker
)
{
// Split each inputSpec and assign to workers. This list maps worker
number -> input number -> input slice.
@@ -91,7 +92,13 @@ public class WorkerInputs
}
} else {
// Non-broadcast case: split slices across workers.
- List<InputSlice> slices = assignmentStrategy.assign(stageDef,
inputSpec, stageWorkerCountMap, slicer);
+ List<InputSlice> slices = assignmentStrategy.assign(
+ stageDef,
+ inputSpec,
+ stageWorkerCountMap,
+ slicer,
+ maxInputBytesPerWorker
+ );
if (slices.isEmpty()) {
// Need at least one slice, so we can have at least one worker. It's
OK if it has nothing to read.
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
index 6296065762..479bc4de0d 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
@@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.msq.indexing.error.ColumnNameRestrictedFault;
import org.apache.druid.msq.indexing.error.RowTooLargeFault;
+import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
import org.apache.druid.msq.test.CounterSnapshotMatcher;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestFileUtils;
@@ -983,10 +984,9 @@ public class MSQInsertTest extends MSQTestBase
@Test
public void testInsertQueryWithInvalidSubtaskCount()
{
- Map<String, Object> localContext = ImmutableMap.<String, Object>builder()
- .putAll(context)
-
.put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 1)
- .build();
+ Map<String, Object> localContext = new HashMap<>(context);
+ localContext.put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 1);
+
testIngestQuery().setSql(
"insert into foo1 select __time, dim1 , count(*) as
cnt from foo where dim1 is not null group by 1, 2 PARTITIONED by day clustered
by dim1")
.setQueryContext(localContext)
@@ -1065,6 +1065,106 @@ public class MSQInsertTest extends MSQTestBase
.verifyPlanningErrors();
}
+ @Test
+ public void testCorrectNumberOfWorkersUsedAutoModeWithoutBytesLimit() throws
IOException
+ {
+ Map<String, Object> localContext = new HashMap<>(context);
+ localContext.put(MultiStageQueryContext.CTX_TASK_ASSIGNMENT_STRATEGY,
WorkerAssignmentStrategy.AUTO.name());
+ localContext.put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 4);
+
+ final File toRead1 =
MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this,
"/multipleFiles/wikipedia-sampled-1.json");
+ final String toReadFileNameAsJson1 =
queryFramework().queryJsonMapper().writeValueAsString(toRead1.getAbsolutePath());
+
+ final File toRead2 =
MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this,
"/multipleFiles/wikipedia-sampled-2.json");
+ final String toReadFileNameAsJson2 =
queryFramework().queryJsonMapper().writeValueAsString(toRead2.getAbsolutePath());
+
+ final File toRead3 =
MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this,
"/multipleFiles/wikipedia-sampled-3.json");
+ final String toReadFileNameAsJson3 =
queryFramework().queryJsonMapper().writeValueAsString(toRead3.getAbsolutePath());
+
+ RowSignature rowSignature = RowSignature.builder()
+ .add("__time", ColumnType.LONG)
+ .add("cnt", ColumnType.LONG)
+ .build();
+
+ testIngestQuery().setSql(
+ "insert into foo1 select "
+ + " floor(TIME_PARSE(\"timestamp\") to day) AS
__time,\n"
+ + " count(*) as cnt\n"
+ + "FROM TABLE(\n"
+ + " EXTERN(\n"
+ + " '{ \"files\": [" + toReadFileNameAsJson1 + ","
+ toReadFileNameAsJson2 + "," + toReadFileNameAsJson3 +
"],\"type\":\"local\"}',\n"
+ + " '{\"type\": \"json\"}',\n"
+ + " '[{\"name\": \"timestamp\", \"type\":
\"string\"}, {\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"user\",
\"type\": \"string\"}]'\n"
+ + " )\n"
+ + ") group by 1 PARTITIONED by day ")
+ .setExpectedDataSource("foo1")
+ .setQueryContext(localContext)
+ .setExpectedRowSignature(rowSignature)
+ .setExpectedSegment(ImmutableSet.of(SegmentId.of(
+ "foo1",
+ Intervals.of("2016-06-27/P1D"),
+ "test",
+ 0
+ )))
+ .setExpectedResultRows(ImmutableList.of(new
Object[]{1466985600000L, 20L}))
+ .setExpectedWorkerCount(
+ ImmutableMap.of(
+ 0, 1
+ ))
+ .verifyResults();
+
+ }
+
+ @Test
+ public void testCorrectNumberOfWorkersUsedAutoModeWithBytesLimit() throws
IOException
+ {
+ Map<String, Object> localContext = new HashMap<>(context);
+ localContext.put(MultiStageQueryContext.CTX_TASK_ASSIGNMENT_STRATEGY,
WorkerAssignmentStrategy.AUTO.name());
+ localContext.put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 4);
+ localContext.put(MultiStageQueryContext.CTX_MAX_INPUT_BYTES_PER_WORKER,
10);
+
+ final File toRead1 =
MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this,
"/multipleFiles/wikipedia-sampled-1.json");
+ final String toReadFileNameAsJson1 =
queryFramework().queryJsonMapper().writeValueAsString(toRead1.getAbsolutePath());
+
+ final File toRead2 =
MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this,
"/multipleFiles/wikipedia-sampled-2.json");
+ final String toReadFileNameAsJson2 =
queryFramework().queryJsonMapper().writeValueAsString(toRead2.getAbsolutePath());
+
+ final File toRead3 =
MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this,
"/multipleFiles/wikipedia-sampled-3.json");
+ final String toReadFileNameAsJson3 =
queryFramework().queryJsonMapper().writeValueAsString(toRead3.getAbsolutePath());
+
+ RowSignature rowSignature = RowSignature.builder()
+ .add("__time", ColumnType.LONG)
+ .add("cnt", ColumnType.LONG)
+ .build();
+
+ testIngestQuery().setSql(
+ "insert into foo1 select "
+ + " floor(TIME_PARSE(\"timestamp\") to day) AS
__time,\n"
+ + " count(*) as cnt\n"
+ + "FROM TABLE(\n"
+ + " EXTERN(\n"
+ + " '{ \"files\": [" + toReadFileNameAsJson1 + ","
+ toReadFileNameAsJson2 + "," + toReadFileNameAsJson3 +
"],\"type\":\"local\"}',\n"
+ + " '{\"type\": \"json\"}',\n"
+ + " '[{\"name\": \"timestamp\", \"type\":
\"string\"}, {\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"user\",
\"type\": \"string\"}]'\n"
+ + " )\n"
+ + ") group by 1 PARTITIONED by day ")
+ .setExpectedDataSource("foo1")
+ .setQueryContext(localContext)
+ .setExpectedRowSignature(rowSignature)
+ .setExpectedSegment(ImmutableSet.of(SegmentId.of(
+ "foo1",
+ Intervals.of("2016-06-27/P1D"),
+ "test",
+ 0
+ )))
+ .setExpectedResultRows(ImmutableList.of(new
Object[]{1466985600000L, 20L}))
+ .setExpectedWorkerCount(
+ ImmutableMap.of(
+ 0, 3
+ ))
+ .verifyResults();
+ }
+
@Test
public void testInsertArraysAutoType() throws IOException
{
@@ -1120,7 +1220,6 @@ public class MSQInsertTest extends MSQTestBase
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
.verifyResults();
-
}
@Nonnull
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/StageDefinitionTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/StageDefinitionTest.java
index 793c4293f1..20110b6332 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/StageDefinitionTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/StageDefinitionTest.java
@@ -26,7 +26,6 @@ import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.frame.key.KeyColumn;
import org.apache.druid.frame.key.KeyOrder;
import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.input.stage.StageInputSpec;
import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory;
import org.apache.druid.msq.statistics.ClusterByStatisticsCollectorImpl;
@@ -57,8 +56,7 @@ public class StageDefinitionTest
RowSignature.empty(),
null,
0,
- false,
- Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
+ false
);
Assert.assertThrows(ISE.class, () ->
stageDefinition.generatePartitionBoundariesForShuffle(null));
@@ -79,8 +77,7 @@ public class StageDefinitionTest
false
),
1,
- false,
- Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
+ false
);
Assert.assertThrows(ISE.class, () ->
stageDefinition.generatePartitionBoundariesForShuffle(null));
@@ -101,8 +98,7 @@ public class StageDefinitionTest
false
),
1,
- false,
- Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
+ false
);
Assert.assertThrows(
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/BaseControllerQueryKernelTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/BaseControllerQueryKernelTest.java
index 1e0177ae12..6ae18dda1e 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/BaseControllerQueryKernelTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/BaseControllerQueryKernelTest.java
@@ -27,6 +27,7 @@ import org.apache.druid.frame.key.RowKey;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.indexing.error.MSQFault;
import org.apache.druid.msq.indexing.error.UnknownFault;
import org.apache.druid.msq.input.InputSpecSlicerFactory;
@@ -232,7 +233,8 @@ public class BaseControllerQueryKernelTest extends
InitializedNullHandlingTest
return mapStageIdsToStageNumbers(
controllerQueryKernel.createAndGetNewStageIds(
inputSlicerFactory,
- WorkerAssignmentStrategy.MAX
+ WorkerAssignmentStrategy.MAX,
+ Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
)
);
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java
index 6a97e0b0bc..f59ae121d4 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java
@@ -25,6 +25,7 @@ import it.unimi.dsi.fastutil.ints.IntSet;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.longs.LongList;
import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.input.InputSlice;
import org.apache.druid.msq.input.InputSpec;
import org.apache.druid.msq.input.InputSpecSlicer;
@@ -35,12 +36,19 @@ import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+
public class WorkerInputsTest
{
private static final String QUERY_ID = "myQuery";
@@ -59,7 +67,8 @@ public class WorkerInputsTest
stageDef,
Int2IntMaps.EMPTY_MAP,
new TestInputSpecSlicer(true),
- WorkerAssignmentStrategy.MAX
+ WorkerAssignmentStrategy.MAX,
+ Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
);
Assert.assertEquals(
@@ -87,7 +96,8 @@ public class WorkerInputsTest
stageDef,
Int2IntMaps.EMPTY_MAP,
new TestInputSpecSlicer(true),
- WorkerAssignmentStrategy.MAX
+ WorkerAssignmentStrategy.MAX,
+ Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
);
Assert.assertEquals(
@@ -115,7 +125,8 @@ public class WorkerInputsTest
stageDef,
Int2IntMaps.EMPTY_MAP,
new TestInputSpecSlicer(true),
- WorkerAssignmentStrategy.AUTO
+ WorkerAssignmentStrategy.AUTO,
+ Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
);
Assert.assertEquals(
@@ -140,7 +151,8 @@ public class WorkerInputsTest
stageDef,
Int2IntMaps.EMPTY_MAP,
new TestInputSpecSlicer(true),
- WorkerAssignmentStrategy.AUTO
+ WorkerAssignmentStrategy.AUTO,
+ Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
);
Assert.assertEquals(
@@ -166,7 +178,8 @@ public class WorkerInputsTest
stageDef,
Int2IntMaps.EMPTY_MAP,
new TestInputSpecSlicer(true),
- WorkerAssignmentStrategy.AUTO
+ WorkerAssignmentStrategy.AUTO,
+ Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
);
Assert.assertEquals(
@@ -191,7 +204,8 @@ public class WorkerInputsTest
stageDef,
Int2IntMaps.EMPTY_MAP,
new TestInputSpecSlicer(true),
- WorkerAssignmentStrategy.AUTO
+ WorkerAssignmentStrategy.AUTO,
+ Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
);
Assert.assertEquals(
@@ -216,7 +230,8 @@ public class WorkerInputsTest
stageDef,
Int2IntMaps.EMPTY_MAP,
new TestInputSpecSlicer(true),
- WorkerAssignmentStrategy.AUTO
+ WorkerAssignmentStrategy.AUTO,
+ Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
);
Assert.assertEquals(
@@ -242,7 +257,8 @@ public class WorkerInputsTest
stageDef,
Int2IntMaps.EMPTY_MAP,
new TestInputSpecSlicer(true),
- WorkerAssignmentStrategy.AUTO
+ WorkerAssignmentStrategy.AUTO,
+ Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
);
Assert.assertEquals(
@@ -274,7 +290,8 @@ public class WorkerInputsTest
stageDef,
Int2IntMaps.EMPTY_MAP,
new TestInputSpecSlicer(true),
- WorkerAssignmentStrategy.AUTO
+ WorkerAssignmentStrategy.AUTO,
+ Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
);
Assert.assertEquals(
@@ -288,6 +305,127 @@ public class WorkerInputsTest
);
}
+ @Test
+ public void test_max_shouldAlwaysSplitStatic()
+ {
+ TestInputSpec inputSpecToSplit = new TestInputSpec(4_000_000_000L);
+ final StageDefinition stageDef =
+ StageDefinition.builder(0)
+ .inputs(inputSpecToSplit)
+ .maxWorkerCount(3)
+ .processorFactory(new
OffsetLimitFrameProcessorFactory(0, 0L))
+ .build(QUERY_ID);
+
+ TestInputSpecSlicer testInputSpecSlicer = spy(new
TestInputSpecSlicer(true));
+
+ final WorkerInputs inputs = WorkerInputs.create(
+ stageDef,
+ Int2IntMaps.EMPTY_MAP,
+ testInputSpecSlicer,
+ WorkerAssignmentStrategy.MAX,
+ Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
+ );
+
+ Mockito.verify(testInputSpecSlicer,
times(0)).canSliceDynamic(inputSpecToSplit);
+ Mockito.verify(testInputSpecSlicer, times(1)).sliceStatic(any(), anyInt());
+
+ Assert.assertEquals(
+ ImmutableMap.<Integer, List<InputSlice>>builder()
+ .put(
+ 0,
+ Collections.singletonList(new
TestInputSlice(4_000_000_000L))
+ )
+ .put(
+ 1,
+ Collections.singletonList(new TestInputSlice())
+ )
+ .put(
+ 2,
+ Collections.singletonList(new TestInputSlice())
+ )
+ .build(),
+ inputs.assignmentsMap()
+ );
+
+ }
+
+ @Test
+ public void test_auto_shouldSplitDynamicIfPossible()
+ {
+ TestInputSpec inputSpecToSplit = new TestInputSpec(1_000_000_000L,
1_000_000_000L, 1_000_000_000L);
+ final StageDefinition stageDef =
+ StageDefinition.builder(0)
+ .inputs(inputSpecToSplit)
+ .maxWorkerCount(3)
+ .processorFactory(new
OffsetLimitFrameProcessorFactory(0, 0L))
+ .build(QUERY_ID);
+
+ TestInputSpecSlicer testInputSpecSlicer = spy(new
TestInputSpecSlicer(true));
+
+ final WorkerInputs inputs = WorkerInputs.create(
+ stageDef,
+ Int2IntMaps.EMPTY_MAP,
+ testInputSpecSlicer,
+ WorkerAssignmentStrategy.AUTO,
+ 100
+ );
+
+ Mockito.verify(testInputSpecSlicer,
times(1)).canSliceDynamic(inputSpecToSplit);
+ Mockito.verify(testInputSpecSlicer, times(1)).sliceDynamic(any(),
anyInt(), anyInt(), anyLong());
+
+ Assert.assertEquals(
+ ImmutableMap.<Integer, List<InputSlice>>builder()
+ .put(
+ 0,
+ Collections.singletonList(new
TestInputSlice(1_000_000_000L))
+ )
+ .put(
+ 1,
+ Collections.singletonList(new
TestInputSlice(1_000_000_000L))
+ )
+ .put(
+ 2,
+ Collections.singletonList(new
TestInputSlice(1_000_000_000L))
+ )
+ .build(),
+ inputs.assignmentsMap()
+ );
+ }
+
+ @Test
+ public void test_auto_shouldUseLeastWorkersPossible()
+ {
+ TestInputSpec inputSpecToSplit = new TestInputSpec(1_000_000_000L,
1_000_000_000L, 1_000_000_000L);
+ final StageDefinition stageDef =
+ StageDefinition.builder(0)
+ .inputs(inputSpecToSplit)
+ .maxWorkerCount(3)
+ .processorFactory(new
OffsetLimitFrameProcessorFactory(0, 0L))
+ .build(QUERY_ID);
+
+ TestInputSpecSlicer testInputSpecSlicer = spy(new
TestInputSpecSlicer(true));
+
+ final WorkerInputs inputs = WorkerInputs.create(
+ stageDef,
+ Int2IntMaps.EMPTY_MAP,
+ testInputSpecSlicer,
+ WorkerAssignmentStrategy.AUTO,
+ Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
+ );
+
+ Assert.assertEquals(
+ ImmutableMap.<Integer, List<InputSlice>>builder()
+ .put(
+ 0,
+ Collections.singletonList(new
TestInputSlice(1_000_000_000L, 1_000_000_000L, 1_000_000_000L))
+ )
+ .build(),
+ inputs.assignmentsMap()
+ );
+
+ Mockito.verify(testInputSpecSlicer,
times(1)).canSliceDynamic(inputSpecToSplit);
+ }
+
@Test
public void testEquals()
{
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index d768c73d80..e44d1974b8 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -225,6 +225,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
.put(QueryContexts.CTX_SQL_QUERY_ID, "test-query")
.put(QueryContexts.FINALIZE_KEY, true)
.put(QueryContexts.CTX_SQL_STRINGIFY_ARRAYS, false)
+ .put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 2)
.put(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, 0)
.build();
@@ -799,6 +800,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
protected Matcher<Throwable> expectedExecutionErrorMatcher = null;
protected MSQFault expectedMSQFault = null;
protected Class<? extends MSQFault> expectedMSQFaultClass = null;
+ protected Map<Integer, Integer> expectedStageVsWorkerCount = new
HashMap<>();
protected final Map<Integer, Map<Integer, Map<String,
CounterSnapshotMatcher>>>
expectedStageWorkerChannelToCounters = new HashMap<>();
@@ -893,6 +895,12 @@ public class MSQTestBase extends BaseCalciteQueryTest
return asBuilder();
}
+ public Builder setExpectedWorkerCount(Map<Integer, Integer>
stageVsWorkerCount)
+ {
+ this.expectedStageVsWorkerCount = stageVsWorkerCount;
+ return asBuilder();
+ }
+
public Builder setExpectedSegmentGenerationProgressCountersForStageWorker(
CounterSnapshotMatcher counterSnapshot,
int stage,
@@ -925,6 +933,14 @@ public class MSQTestBase extends BaseCalciteQueryTest
MatcherAssert.assertThat(e, expectedValidationErrorMatcher);
}
+ protected void verifyWorkerCount(CounterSnapshotsTree counterSnapshotsTree)
+ {
+ Map<Integer, Map<Integer, CounterSnapshots>> counterMap =
counterSnapshotsTree.copyMap();
+ for (Map.Entry<Integer, Integer> stageWorkerCount :
expectedStageVsWorkerCount.entrySet()) {
+ Assert.assertEquals(stageWorkerCount.getValue().intValue(),
counterMap.get(stageWorkerCount.getKey()).size());
+ }
+ }
+
protected void verifyCounters(CounterSnapshotsTree counterSnapshotsTree)
{
Assert.assertNotNull(counterSnapshotsTree);
@@ -1061,6 +1077,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
return;
}
MSQTaskReportPayload reportPayload = getPayloadOrThrow(controllerId);
+ verifyWorkerCount(reportPayload.getCounters());
verifyCounters(reportPayload.getCounters());
MSQSpec foundSpec =
indexingServiceClient.getQuerySpecForTask(controllerId);
@@ -1270,6 +1287,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
MSQTaskReportPayload payload = getPayloadOrThrow(controllerId);
verifyCounters(payload.getCounters());
+ verifyWorkerCount(payload.getCounters());
if (payload.getStatus().getErrorReport() != null) {
throw new ISE("Query %s failed due to %s", sql,
payload.getStatus().getErrorReport().toString());
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
index 73cd900a5b..58ee01f008 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
@@ -63,10 +63,12 @@ import java.util.stream.Collectors;
public class MSQTestControllerContext implements ControllerContext
{
private static final Logger log = new Logger(MSQTestControllerContext.class);
+ private static final int NUM_WORKERS = 4;
private final TaskActionClient taskActionClient;
private final Map<String, Worker> inMemoryWorkers = new HashMap<>();
private final ConcurrentMap<String, TaskStatus> statusMap = new
ConcurrentHashMap<>();
- private final ListeningExecutorService executor =
MoreExecutors.listeningDecorator(Execs.singleThreaded(
+ private final ListeningExecutorService executor =
MoreExecutors.listeningDecorator(Execs.multiThreaded(
+ NUM_WORKERS,
"MultiStageQuery-test-controller-client"));
private final CoordinatorClient coordinatorClient;
private final DruidNode node = new DruidNode(
diff --git
a/extensions-core/multi-stage-query/src/test/resources/multipleFiles/wikipedia-sampled-1.json
b/extensions-core/multi-stage-query/src/test/resources/multipleFiles/wikipedia-sampled-1.json
new file mode 100644
index 0000000000..26002ac687
--- /dev/null
+++
b/extensions-core/multi-stage-query/src/test/resources/multipleFiles/wikipedia-sampled-1.json
@@ -0,0 +1,7 @@
+{"isRobot":true,"channel":"#sv.wikipedia","timestamp":"2016-06-27T00:00:11.080Z","flags":"NB","isUnpatrolled":false,"page":"Salo
Toraut","diffUrl":"https://sv.wikipedia.org/w/index.php?oldid=36099284&rcid=89369918","added":31,"comment":"Botskapande
Indonesien
omdirigering","commentLength":35,"isNew":true,"isMinor":false,"delta":31,"isAnonymous":false,"user":"Lsjbot","deltaBucket":0.0,"deleted":0,"namespace":"Main"}
+{"isRobot":false,"channel":"#en.wikipedia","cityName":"Buenos
Aires","timestamp":"2016-06-27T00:00:34.959Z","flags":"","isUnpatrolled":false,"page":"Bailando
2015","countryName":"Argentina","regionIsoCode":"C","diffUrl":"https://en.wikipedia.org/w/index.php?diff=727144213&oldid=727144184","added":2,"metroCode":null,"comment":"/*
Scores
*/","commentLength":12,"isNew":false,"isMinor":false,"delta":2,"countryIsoCode":"AR","isAnonymous":true,"user":"181.230.118.178","regionName":"Buenos
Aire [...]
+{"isRobot":false,"channel":"#en.wikipedia","timestamp":"2016-06-27T00:00:36.027Z","flags":"M","isUnpatrolled":false,"page":"Richie
Rich's Christmas
Wish","diffUrl":"https://en.wikipedia.org/w/index.php?diff=727144214&oldid=716794477","added":0,"comment":"standard
term is [[title
character]]","commentLength":36,"isNew":false,"isMinor":true,"delta":-2,"isAnonymous":false,"user":"JasonAQuest","deltaBucket":-100.0,"deleted":2,"namespace":"Main"}
+{"isRobot":true,"channel":"#pl.wikipedia","timestamp":"2016-06-27T00:00:58.599Z","flags":"NB","isUnpatrolled":false,"page":"Kategoria:Dyskusje
nad usunięciem artykułu zakończone bez konsensusu − lipiec
2016","diffUrl":"https://pl.wikipedia.org/w/index.php?oldid=46204477&rcid=68522573","added":270,"comment":"utworzenie
kategorii","commentLength":20,"isNew":true,"isMinor":false,"delta":270,"isAnonymous":false,"user":"Beau.bot","deltaBucket":200.0,"deleted":0,"namespace":"Kategoria"}
+{"isRobot":true,"channel":"#sh.wikipedia","timestamp":"2016-06-27T00:01:03.685Z","flags":"MB","isUnpatrolled":false,"page":"El
Terco,
Bachíniva","diffUrl":"https://sh.wikipedia.org/w/index.php?diff=29388566&oldid=29282572&rcid=34712923","added":0,"comment":"Bot:
Automatska zamjena teksta (-[[Administrativna podjela Meksika|Admin]]
+[[Administrativna podjela
Meksika|Admi]])","commentLength":118,"isNew":false,"isMinor":true,"delta":-1,"isAnonymous":false,"user":"Kolega2357","deltaBucket":
[...]
+{"isRobot":true,"channel":"#ceb.wikipedia","timestamp":"2016-06-27T00:01:07.347Z","flags":"NB","isUnpatrolled":false,"page":"Neqerssuaq","diffUrl":"https://ceb.wikipedia.org/w/index.php?oldid=9563239&rcid=36193146","added":4150,"comment":"Paghimo
ni bot
Greenland","commentLength":24,"isNew":true,"isMinor":false,"delta":4150,"isAnonymous":false,"user":"Lsjbot","deltaBucket":4100.0,"deleted":0,"namespace":"Main"}
+{"isRobot":false,"channel":"#es.wikipedia","cityName":null,"timestamp":"2016-06-27T00:01:14.343Z","flags":"","isUnpatrolled":false,"page":"Sumo
(banda)","countryName":"Argentina","regionIsoCode":null,"diffUrl":"https://es.wikipedia.org/w/index.php?diff=91937261&oldid=91937229","added":0,"metroCode":null,"comment":"/*
Línea de tiempo
*/","commentLength":21,"isNew":false,"isMinor":false,"delta":-173,"countryIsoCode":"AR","isAnonymous":true,"user":"181.110.165.189","regionName":null,"deltaB
[...]
diff --git
a/extensions-core/multi-stage-query/src/test/resources/multipleFiles/wikipedia-sampled-2.json
b/extensions-core/multi-stage-query/src/test/resources/multipleFiles/wikipedia-sampled-2.json
new file mode 100644
index 0000000000..ae560bf8f2
--- /dev/null
+++
b/extensions-core/multi-stage-query/src/test/resources/multipleFiles/wikipedia-sampled-2.json
@@ -0,0 +1,7 @@
+{"isRobot":false,"channel":"#en.wikipedia","timestamp":"2016-06-27T00:01:59.599Z","flags":"","isUnpatrolled":false,"page":"Panama
Canal","diffUrl":"https://en.wikipedia.org/w/index.php?diff=727144327&oldid=727144068","added":496,"comment":"expanding
lead + missing
RS","commentLength":27,"isNew":false,"isMinor":false,"delta":496,"isAnonymous":false,"user":"Mariordo","deltaBucket":400.0,"deleted":0,"namespace":"Main"}
+{"isRobot":false,"channel":"#ru.wikipedia","timestamp":"2016-06-27T00:02:09.238Z","flags":"","isUnpatrolled":false,"page":"Википедия:Опросы/Унификация
шаблонов «Не
переведено»","diffUrl":"https://ru.wikipedia.org/w/index.php?diff=79213888&oldid=79213880","added":196,"comment":"/*
Нет
*/","commentLength":9,"isNew":false,"isMinor":false,"delta":196,"isAnonymous":false,"user":"Wanderer777","deltaBucket":100.0,"deleted":0,"namespace":"Википедия"}
+{"isRobot":true,"channel":"#sh.wikipedia","timestamp":"2016-06-27T00:02:10.857Z","flags":"MB","isUnpatrolled":false,"page":"Hermanos
Díaz,
Ascensión","diffUrl":"https://sh.wikipedia.org/w/index.php?diff=29388624&oldid=29282630&rcid=34712981","added":0,"comment":"Bot:
Automatska zamjena teksta (-[[Administrativna podjela Meksika|Admin]]
+[[Administrativna podjela
Meksika|Admi]])","commentLength":118,"isNew":false,"isMinor":true,"delta":-1,"isAnonymous":false,"user":"Kolega2357","deltaBuc
[...]
+{"isRobot":false,"channel":"#es.wikipedia","timestamp":"2016-06-27T01:02:13.153Z","flags":"","isUnpatrolled":false,"page":"Clasificación
para la Eurocopa Sub-21 de
2017","diffUrl":"https://es.wikipedia.org/w/index.php?diff=91937277&oldid=91937272","added":4,"comment":"/*
Máximos Asistentes
*/","commentLength":24,"isNew":false,"isMinor":false,"delta":4,"isAnonymous":false,"user":"Guly600","deltaBucket":0.0,"deleted":0,"namespace":"Main"}
+{"isRobot":false,"channel":"#zh.wikipedia","timestamp":"2016-06-27T01:02:13.815Z","flags":"","isUnpatrolled":false,"page":"中共十八大以来的反腐败工作","diffUrl":"https://zh.wikipedia.org/w/index.php?diff=40605390&oldid=40605381","added":18,"comment":"/*
违反中共中央八项规定官员(副部级及以上)
*/","commentLength":26,"isNew":false,"isMinor":false,"delta":18,"isAnonymous":false,"user":"2001:DA8:207:E132:94DC:BA03:DFDF:8F9F","deltaBucket":0.0,"deleted":0,"namespace":"Main"}
+{"isRobot":true,"channel":"#sh.wikipedia","timestamp":"2016-06-27T01:02:15.952Z","flags":"MB","isUnpatrolled":false,"page":"El
Sicomoro,
Ascensión","diffUrl":"https://sh.wikipedia.org/w/index.php?diff=29388628&oldid=29282634&rcid=34712985","added":0,"comment":"Bot:
Automatska zamjena teksta (-[[Administrativna podjela Meksika|Admin]]
+[[Administrativna podjela
Meksika|Admi]])","commentLength":118,"isNew":false,"isMinor":true,"delta":-1,"isAnonymous":false,"user":"Kolega2357","deltaBucke
[...]
+{"isRobot":false,"channel":"#id.wikipedia","timestamp":"2016-06-27T01:02:20.008Z","flags":"!","isUnpatrolled":true,"page":"Ibnu
Sina","diffUrl":"https://id.wikipedia.org/w/index.php?diff=11687177&oldid=11444059&rcid=20812462","added":106,"comment":"gambar","commentLength":6,"isNew":false,"isMinor":false,"delta":106,"isAnonymous":false,"user":"Ftihikam","deltaBucket":100.0,"deleted":0,"namespace":"Main"}
diff --git
a/extensions-core/multi-stage-query/src/test/resources/multipleFiles/wikipedia-sampled-3.json
b/extensions-core/multi-stage-query/src/test/resources/multipleFiles/wikipedia-sampled-3.json
new file mode 100644
index 0000000000..6604b51fb1
--- /dev/null
+++
b/extensions-core/multi-stage-query/src/test/resources/multipleFiles/wikipedia-sampled-3.json
@@ -0,0 +1,6 @@
+{"isRobot":false,"channel":"#pt.wikipedia","timestamp":"2016-06-27T02:02:22.868Z","flags":"N","isUnpatrolled":false,"page":"Dobromir
Zhechev","diffUrl":"https://pt.wikipedia.org/w/index.php?oldid=46012430&rcid=67145794","added":1926,"comment":"Novo","commentLength":4,"isNew":true,"isMinor":false,"delta":1926,"isAnonymous":false,"user":"Ceresta","deltaBucket":1900.0,"deleted":0,"namespace":"Main"}
+{"isRobot":true,"channel":"#de.wikipedia","timestamp":"2016-06-27T02:02:29.228Z","flags":"B","isUnpatrolled":false,"page":"Benutzer
Diskussion:Squasher/Archiv/2016","diffUrl":"https://de.wikipedia.org/w/index.php?diff=155658493&oldid=155085489","added":2560,"comment":"1
Abschnitt aus [[Benutzer Diskussion:Squasher]]
archiviert","commentLength":60,"isNew":false,"isMinor":false,"delta":2560,"isAnonymous":false,"user":"TaxonBot","deltaBucket":2500.0,"deleted":0,"namespace":"Benutzer
Diskussion"}
+{"isRobot":true,"channel":"#sh.wikipedia","timestamp":"2016-06-27T02:02:50.958Z","flags":"MB","isUnpatrolled":false,"page":"Trinidad
Jiménez G., Benemérito de las
Américas","diffUrl":"https://sh.wikipedia.org/w/index.php?diff=29388657&oldid=29282663&rcid=34713014","added":0,"comment":"Bot:
Automatska zamjena teksta (-[[Administrativna podjela Meksika|Admin]]
+[[Administrativna podjela
Meksika|Admi]])","commentLength":118,"isNew":false,"isMinor":true,"delta":-1,"isAnonymous":false,"user"
[...]
+{"isRobot":false,"channel":"#zh.wikipedia","timestamp":"2016-06-27T02:02:59.521Z","flags":"N","isUnpatrolled":false,"page":"Wikipedia:頁面存廢討論/記錄/2016/06/27","diffUrl":"https://zh.wikipedia.org/w/index.php?oldid=40605393&rcid=62748523","added":1986,"comment":"添加[[李洛能八大弟子]]","commentLength":13,"isNew":true,"isMinor":false,"delta":1986,"isAnonymous":false,"user":"Tigerzeng","deltaBucket":1900.0,"deleted":0,"namespace":"Wikipedia"}
+{"isRobot":false,"channel":"#en.wikipedia","timestamp":"2016-06-27T02:03:01.090Z","flags":"","isUnpatrolled":false,"page":"File:Paint.net
4.0.6
screenshot.png","diffUrl":"https://en.wikipedia.org/w/index.php?diff=727144426&oldid=713167833","added":0,"comment":"/*
Summary
*/","commentLength":13,"isNew":false,"isMinor":false,"delta":-463,"isAnonymous":false,"user":"Calvin
Hogg","deltaBucket":-500.0,"deleted":463,"namespace":"File"}
+{"isRobot":true,"channel":"#de.wikipedia","timestamp":"2016-06-27T02:03:05.240Z","flags":"B","isUnpatrolled":false,"page":"Benutzer
Diskussion:HerrSonderbar","diffUrl":"https://de.wikipedia.org/w/index.php?diff=155658496&oldid=155657380","added":364,"comment":"Neuer
Abschnitt /* Benachrichtigung über inaktive Mentees am 27. 6. 2016
*/","commentLength":75,"isNew":false,"isMinor":false,"delta":364,"isAnonymous":false,"user":"GiftBot","deltaBucket":300.0,"deleted":0,"namespace":"Benutzer
Di [...]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]