This is an automated email from the ASF dual-hosted git repository.

lakshsingla pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 10bce22e68 Configure maxBytesPerWorker directly instead of using 
StageDefinition (#14257)
10bce22e68 is described below

commit 10bce22e680f590e8c07fff980fcbe6c7fb0ea33
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Mon May 15 16:51:57 2023 +0530

    Configure maxBytesPerWorker directly instead of using StageDefinition 
(#14257)
    
    * Configure maxBytesPerWorker directly instead of using StageDefinition
---
 .../org/apache/druid/msq/exec/ControllerImpl.java  |  10 +-
 .../apache/druid/msq/kernel/StageDefinition.java   |  20 +--
 .../druid/msq/kernel/StageDefinitionBuilder.java   |  11 +-
 .../druid/msq/kernel/WorkerAssignmentStrategy.java |  27 ++--
 .../kernel/controller/ControllerQueryKernel.java   |  11 +-
 .../kernel/controller/ControllerStageTracker.java  |   5 +-
 .../druid/msq/kernel/controller/WorkerInputs.java  |  11 +-
 .../org/apache/druid/msq/exec/MSQInsertTest.java   | 109 +++++++++++++-
 .../druid/msq/kernel/StageDefinitionTest.java      |  10 +-
 .../controller/BaseControllerQueryKernelTest.java  |   4 +-
 .../msq/kernel/controller/WorkerInputsTest.java    | 156 +++++++++++++++++++--
 .../org/apache/druid/msq/test/MSQTestBase.java     |  18 +++
 .../druid/msq/test/MSQTestControllerContext.java   |   4 +-
 .../multipleFiles/wikipedia-sampled-1.json         |   7 +
 .../multipleFiles/wikipedia-sampled-2.json         |   7 +
 .../multipleFiles/wikipedia-sampled-3.json         |   6 +
 16 files changed, 345 insertions(+), 71 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index a7aeffe0b1..b39249d811 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -1599,14 +1599,10 @@ public class ControllerImpl implements Controller
       final DataSchema dataSchema =
           generateDataSchema(querySpec, querySignature, queryClusterBy, 
columnMappings, jsonMapper);
 
-      final long maxInputBytesPerWorker =
-          
MultiStageQueryContext.getMaxInputBytesPerWorker(querySpec.getQuery().context());
-
       builder.add(
           StageDefinition.builder(queryDef.getNextStageNumber())
                          .inputs(new 
StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber()))
                          .maxWorkerCount(tuningConfig.getMaxNumWorkers())
-                         .maxInputBytesPerWorker(maxInputBytesPerWorker)
                          .processorFactory(
                              new SegmentGeneratorFrameProcessorFactory(
                                  dataSchema,
@@ -2406,10 +2402,14 @@ public class ControllerImpl implements Controller
      */
     private void startStages() throws IOException, InterruptedException
     {
+      final long maxInputBytesPerWorker =
+          
MultiStageQueryContext.getMaxInputBytesPerWorker(task.getQuerySpec().getQuery().context());
+
       logKernelStatus(queryDef.getQueryId(), queryKernel);
       final List<StageId> newStageIds = queryKernel.createAndGetNewStageIds(
           inputSpecSlicerFactory,
-          task.getQuerySpec().getAssignmentStrategy()
+          task.getQuerySpec().getAssignmentStrategy(),
+          maxInputBytesPerWorker
       );
 
       for (final StageId stageId : newStageIds) {
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
index 9892eae824..d333edb7d2 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
@@ -40,7 +40,6 @@ import org.apache.druid.frame.write.FrameWriters;
 import org.apache.druid.java.util.common.Either;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.msq.exec.Limits;
 import org.apache.druid.msq.input.InputSpec;
 import org.apache.druid.msq.input.InputSpecs;
 import org.apache.druid.msq.statistics.ClusterByStatisticsCollector;
@@ -93,7 +92,6 @@ public class StageDefinition
   private final FrameProcessorFactory processorFactory;
   private final RowSignature signature;
   private final int maxWorkerCount;
-  private final long maxInputBytesPerWorker;
   private final boolean shuffleCheckHasMultipleValues;
 
   @Nullable
@@ -111,8 +109,7 @@ public class StageDefinition
       @JsonProperty("signature") final RowSignature signature,
       @Nullable @JsonProperty("shuffleSpec") final ShuffleSpec shuffleSpec,
       @JsonProperty("maxWorkerCount") final int maxWorkerCount,
-      @JsonProperty("shuffleCheckHasMultipleValues") final boolean 
shuffleCheckHasMultipleValues,
-      @JsonProperty("maxInputBytesPerWorker") final Long maxInputBytesPerWorker
+      @JsonProperty("shuffleCheckHasMultipleValues") final boolean 
shuffleCheckHasMultipleValues
   )
   {
     this.id = Preconditions.checkNotNull(id, "id");
@@ -132,8 +129,6 @@ public class StageDefinition
     this.maxWorkerCount = maxWorkerCount;
     this.shuffleCheckHasMultipleValues = shuffleCheckHasMultipleValues;
     this.frameReader = Suppliers.memoize(() -> 
FrameReader.create(signature))::get;
-    this.maxInputBytesPerWorker = maxInputBytesPerWorker == null ?
-                                  Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER : 
maxInputBytesPerWorker;
 
     if (mustGatherResultKeyStatistics() && 
shuffleSpec.clusterBy().getColumns().isEmpty()) {
       throw new IAE("Cannot shuffle with spec [%s] and nil clusterBy", 
shuffleSpec);
@@ -280,12 +275,6 @@ public class StageDefinition
     return maxWorkerCount;
   }
 
-  @JsonProperty
-  public long getMaxInputBytesPerWorker()
-  {
-    return maxInputBytesPerWorker;
-  }
-
   @JsonProperty("shuffleCheckHasMultipleValues")
   @JsonInclude(JsonInclude.Include.NON_DEFAULT)
   boolean getShuffleCheckHasMultipleValues()
@@ -412,8 +401,7 @@ public class StageDefinition
            && Objects.equals(broadcastInputNumbers, that.broadcastInputNumbers)
            && Objects.equals(processorFactory, that.processorFactory)
            && Objects.equals(signature, that.signature)
-           && Objects.equals(shuffleSpec, that.shuffleSpec)
-           && Objects.equals(maxInputBytesPerWorker, 
that.maxInputBytesPerWorker);
+           && Objects.equals(shuffleSpec, that.shuffleSpec);
   }
 
   @Override
@@ -427,8 +415,7 @@ public class StageDefinition
         signature,
         maxWorkerCount,
         shuffleCheckHasMultipleValues,
-        shuffleSpec,
-        maxInputBytesPerWorker
+        shuffleSpec
     );
   }
 
@@ -444,7 +431,6 @@ public class StageDefinition
            ", maxWorkerCount=" + maxWorkerCount +
            ", shuffleSpec=" + shuffleSpec +
            (shuffleCheckHasMultipleValues ? ", shuffleCheckHasMultipleValues=" 
+ shuffleCheckHasMultipleValues : "") +
-           ", maxInputBytesPerWorker=" + maxInputBytesPerWorker +
            '}';
   }
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinitionBuilder.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinitionBuilder.java
index 3ff9594123..ebf5cfd709 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinitionBuilder.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinitionBuilder.java
@@ -21,7 +21,6 @@ package org.apache.druid.msq.kernel;
 
 import it.unimi.dsi.fastutil.ints.IntRBTreeSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
-import org.apache.druid.msq.exec.Limits;
 import org.apache.druid.msq.input.InputSpec;
 import org.apache.druid.segment.column.RowSignature;
 
@@ -43,7 +42,6 @@ public class StageDefinitionBuilder
   private int maxWorkerCount = 1;
   private ShuffleSpec shuffleSpec = null;
   private boolean shuffleCheckHasMultipleValues = false;
-  private long maxInputBytesPerWorker = 
Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER;
 
   /**
    * Package-private: callers should prefer {@link 
StageDefinition#builder(int)} rather than this constructor.
@@ -107,12 +105,6 @@ public class StageDefinitionBuilder
     return this;
   }
 
-  public StageDefinitionBuilder maxInputBytesPerWorker(final long 
maxInputBytesPerWorker)
-  {
-    this.maxInputBytesPerWorker = maxInputBytesPerWorker;
-    return this;
-  }
-
   int getStageNumber()
   {
     return stageNumber;
@@ -133,8 +125,7 @@ public class StageDefinitionBuilder
         signature,
         shuffleSpec,
         maxWorkerCount,
-        shuffleCheckHasMultipleValues,
-        maxInputBytesPerWorker
+        shuffleCheckHasMultipleValues
     );
   }
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java
index c813778f45..8d1832be17 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java
@@ -45,10 +45,11 @@ public enum WorkerAssignmentStrategy
   MAX {
     @Override
     public List<InputSlice> assign(
-        StageDefinition stageDef,
-        InputSpec inputSpec,
-        Int2IntMap stageWorkerCountMap,
-        InputSpecSlicer slicer
+        final StageDefinition stageDef,
+        final InputSpec inputSpec,
+        final Int2IntMap stageWorkerCountMap,
+        final InputSpecSlicer slicer,
+        final long maxInputBytesPerSlice
     )
     {
       return slicer.sliceStatic(inputSpec, stageDef.getMaxWorkerCount());
@@ -57,7 +58,7 @@ public enum WorkerAssignmentStrategy
 
   /**
    * Use the lowest possible number of tasks, while keeping each task's 
workload under
-   * {@link Limits#MAX_INPUT_FILES_PER_WORKER} files and {@link 
StageDefinition#getMaxInputBytesPerWorker()} bytes.
+   * {@link Limits#MAX_INPUT_FILES_PER_WORKER} files and {@code 
maxInputBytesPerWorker} bytes.
    *
    * Implemented using {@link InputSpecSlicer#sliceDynamic} whenever possible.
    */
@@ -67,7 +68,8 @@ public enum WorkerAssignmentStrategy
         final StageDefinition stageDef,
         final InputSpec inputSpec,
         final Int2IntMap stageWorkerCountMap,
-        final InputSpecSlicer slicer
+        final InputSpecSlicer slicer,
+        final long maxInputBytesPerSlice
     )
     {
       if (slicer.canSliceDynamic(inputSpec)) {
@@ -75,7 +77,7 @@ public enum WorkerAssignmentStrategy
             inputSpec,
             stageDef.getMaxWorkerCount(),
             Limits.MAX_INPUT_FILES_PER_WORKER,
-            stageDef.getMaxInputBytesPerWorker()
+            maxInputBytesPerSlice
         );
       } else {
         // In auto mode, if we can't slice inputs dynamically, we instead 
carry forwards the number of workers from
@@ -110,10 +112,19 @@ public enum WorkerAssignmentStrategy
     return StringUtils.toLowerCase(name());
   }
 
+  /**
+   * @param stageDef current stage definition. Contains information on max 
workers, input stage numbers
+   * @param inputSpec inputSpec containing information on where the input is 
read from
+   * @param stageWorkerCountMap map of past stage number vs number of worker 
inputs
+   * @param slicer creates slices of input spec based on other parameters
+   * @param maxInputBytesPerSlice maximum suggested bytes per input slice
+   * @return list containing input slices
+   */
   public abstract List<InputSlice> assign(
       StageDefinition stageDef,
       InputSpec inputSpec,
       Int2IntMap stageWorkerCountMap,
-      InputSpecSlicer slicer
+      InputSpecSlicer slicer,
+      long maxInputBytesPerSlice
   );
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
index ddd7b633b3..62a45c4eaa 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
@@ -162,7 +162,8 @@ public class ControllerQueryKernel
    */
   public List<StageId> createAndGetNewStageIds(
       final InputSpecSlicerFactory slicerFactory,
-      final WorkerAssignmentStrategy assignmentStrategy
+      final WorkerAssignmentStrategy assignmentStrategy,
+      final long maxInputBytesPerWorker
   )
   {
     final Int2IntMap stageWorkerCountMap = new Int2IntAVLTreeMap();
@@ -177,7 +178,7 @@ public class ControllerQueryKernel
       }
     }
 
-    createNewKernels(stageWorkerCountMap, 
slicerFactory.makeSlicer(stagePartitionsMap), assignmentStrategy);
+    createNewKernels(stageWorkerCountMap, 
slicerFactory.makeSlicer(stagePartitionsMap), assignmentStrategy, 
maxInputBytesPerWorker);
     return stageTracker.values()
                        .stream()
                        .filter(controllerStageTracker -> 
controllerStageTracker.getPhase() == ControllerStagePhase.NEW)
@@ -292,7 +293,8 @@ public class ControllerQueryKernel
   private void createNewKernels(
       final Int2IntMap stageWorkerCountMap,
       final InputSpecSlicer slicer,
-      final WorkerAssignmentStrategy assignmentStrategy
+      final WorkerAssignmentStrategy assignmentStrategy,
+      final long maxInputBytesPerWorker
   )
   {
     for (final StageId nextStage : readyToRunStages) {
@@ -303,7 +305,8 @@ public class ControllerQueryKernel
           stageWorkerCountMap,
           slicer,
           assignmentStrategy,
-          maxRetainedPartitionSketchBytes
+          maxRetainedPartitionSketchBytes,
+          maxInputBytesPerWorker
       );
       stageTracker.put(nextStage, stageKernel);
     }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
index 6619f389f6..e0190bfacb 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
@@ -167,10 +167,11 @@ class ControllerStageTracker
       final Int2IntMap stageWorkerCountMap,
       final InputSpecSlicer slicer,
       final WorkerAssignmentStrategy assignmentStrategy,
-      final int maxRetainedPartitionSketchBytes
+      final int maxRetainedPartitionSketchBytes,
+      final long maxInputBytesPerWorker
   )
   {
-    final WorkerInputs workerInputs = WorkerInputs.create(stageDef, 
stageWorkerCountMap, slicer, assignmentStrategy);
+    final WorkerInputs workerInputs = WorkerInputs.create(stageDef, 
stageWorkerCountMap, slicer, assignmentStrategy, maxInputBytesPerWorker);
     return new ControllerStageTracker(
         stageDef,
         workerInputs,
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java
index da27ceba65..83d7a602bc 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java
@@ -59,7 +59,8 @@ public class WorkerInputs
       final StageDefinition stageDef,
       final Int2IntMap stageWorkerCountMap,
       final InputSpecSlicer slicer,
-      final WorkerAssignmentStrategy assignmentStrategy
+      final WorkerAssignmentStrategy assignmentStrategy,
+      final long maxInputBytesPerWorker
   )
   {
     // Split each inputSpec and assign to workers. This list maps worker 
number -> input number -> input slice.
@@ -91,7 +92,13 @@ public class WorkerInputs
         }
       } else {
         // Non-broadcast case: split slices across workers.
-        List<InputSlice> slices = assignmentStrategy.assign(stageDef, 
inputSpec, stageWorkerCountMap, slicer);
+        List<InputSlice> slices = assignmentStrategy.assign(
+            stageDef,
+            inputSpec,
+            stageWorkerCountMap,
+            slicer,
+            maxInputBytesPerWorker
+        );
 
         if (slices.isEmpty()) {
           // Need at least one slice, so we can have at least one worker. It's 
OK if it has nothing to read.
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
index 6296065762..479bc4de0d 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
@@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.msq.indexing.error.ColumnNameRestrictedFault;
 import org.apache.druid.msq.indexing.error.RowTooLargeFault;
+import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
 import org.apache.druid.msq.test.CounterSnapshotMatcher;
 import org.apache.druid.msq.test.MSQTestBase;
 import org.apache.druid.msq.test.MSQTestFileUtils;
@@ -983,10 +984,9 @@ public class MSQInsertTest extends MSQTestBase
   @Test
   public void testInsertQueryWithInvalidSubtaskCount()
   {
-    Map<String, Object> localContext = ImmutableMap.<String, Object>builder()
-                                                   .putAll(context)
-                                                   
.put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 1)
-                                                   .build();
+    Map<String, Object> localContext = new HashMap<>(context);
+    localContext.put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 1);
+
     testIngestQuery().setSql(
                          "insert into foo1 select  __time, dim1 , count(*) as 
cnt from foo where dim1 is not null group by 1, 2 PARTITIONED by day clustered 
by dim1")
                      .setQueryContext(localContext)
@@ -1065,6 +1065,106 @@ public class MSQInsertTest extends MSQTestBase
                      .verifyPlanningErrors();
   }
 
+  @Test
+  public void testCorrectNumberOfWorkersUsedAutoModeWithoutBytesLimit() throws 
IOException
+  {
+    Map<String, Object> localContext = new HashMap<>(context);
+    localContext.put(MultiStageQueryContext.CTX_TASK_ASSIGNMENT_STRATEGY, 
WorkerAssignmentStrategy.AUTO.name());
+    localContext.put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 4);
+
+    final File toRead1 = 
MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, 
"/multipleFiles/wikipedia-sampled-1.json");
+    final String toReadFileNameAsJson1 = 
queryFramework().queryJsonMapper().writeValueAsString(toRead1.getAbsolutePath());
+
+    final File toRead2 = 
MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, 
"/multipleFiles/wikipedia-sampled-2.json");
+    final String toReadFileNameAsJson2 = 
queryFramework().queryJsonMapper().writeValueAsString(toRead2.getAbsolutePath());
+
+    final File toRead3 = 
MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, 
"/multipleFiles/wikipedia-sampled-3.json");
+    final String toReadFileNameAsJson3 = 
queryFramework().queryJsonMapper().writeValueAsString(toRead3.getAbsolutePath());
+
+    RowSignature rowSignature = RowSignature.builder()
+                                            .add("__time", ColumnType.LONG)
+                                            .add("cnt", ColumnType.LONG)
+                                            .build();
+
+    testIngestQuery().setSql(
+                         "insert into foo1 select "
+                         + "  floor(TIME_PARSE(\"timestamp\") to day) AS 
__time,\n"
+                         + "  count(*) as cnt\n"
+                         + "FROM TABLE(\n"
+                         + "  EXTERN(\n"
+                         + "    '{ \"files\": [" + toReadFileNameAsJson1 + "," 
+ toReadFileNameAsJson2 + "," + toReadFileNameAsJson3 + 
"],\"type\":\"local\"}',\n"
+                         + "    '{\"type\": \"json\"}',\n"
+                         + "    '[{\"name\": \"timestamp\", \"type\": 
\"string\"}, {\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"user\", 
\"type\": \"string\"}]'\n"
+                         + "  )\n"
+                         + ") group by 1  PARTITIONED by day ")
+                     .setExpectedDataSource("foo1")
+                     .setQueryContext(localContext)
+                     .setExpectedRowSignature(rowSignature)
+                     .setExpectedSegment(ImmutableSet.of(SegmentId.of(
+                         "foo1",
+                         Intervals.of("2016-06-27/P1D"),
+                         "test",
+                         0
+                     )))
+                     .setExpectedResultRows(ImmutableList.of(new 
Object[]{1466985600000L, 20L}))
+                     .setExpectedWorkerCount(
+                         ImmutableMap.of(
+                             0, 1
+                         ))
+                     .verifyResults();
+
+  }
+
+  @Test
+  public void testCorrectNumberOfWorkersUsedAutoModeWithBytesLimit() throws 
IOException
+  {
+    Map<String, Object> localContext = new HashMap<>(context);
+    localContext.put(MultiStageQueryContext.CTX_TASK_ASSIGNMENT_STRATEGY, 
WorkerAssignmentStrategy.AUTO.name());
+    localContext.put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 4);
+    localContext.put(MultiStageQueryContext.CTX_MAX_INPUT_BYTES_PER_WORKER, 
10);
+
+    final File toRead1 = 
MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, 
"/multipleFiles/wikipedia-sampled-1.json");
+    final String toReadFileNameAsJson1 = 
queryFramework().queryJsonMapper().writeValueAsString(toRead1.getAbsolutePath());
+
+    final File toRead2 = 
MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, 
"/multipleFiles/wikipedia-sampled-2.json");
+    final String toReadFileNameAsJson2 = 
queryFramework().queryJsonMapper().writeValueAsString(toRead2.getAbsolutePath());
+
+    final File toRead3 = 
MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, 
"/multipleFiles/wikipedia-sampled-3.json");
+    final String toReadFileNameAsJson3 = 
queryFramework().queryJsonMapper().writeValueAsString(toRead3.getAbsolutePath());
+
+    RowSignature rowSignature = RowSignature.builder()
+                                            .add("__time", ColumnType.LONG)
+                                            .add("cnt", ColumnType.LONG)
+                                            .build();
+
+    testIngestQuery().setSql(
+                         "insert into foo1 select "
+                         + "  floor(TIME_PARSE(\"timestamp\") to day) AS 
__time,\n"
+                         + "  count(*) as cnt\n"
+                         + "FROM TABLE(\n"
+                         + "  EXTERN(\n"
+                         + "    '{ \"files\": [" + toReadFileNameAsJson1 + "," 
+ toReadFileNameAsJson2 + "," + toReadFileNameAsJson3 + 
"],\"type\":\"local\"}',\n"
+                         + "    '{\"type\": \"json\"}',\n"
+                         + "    '[{\"name\": \"timestamp\", \"type\": 
\"string\"}, {\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"user\", 
\"type\": \"string\"}]'\n"
+                         + "  )\n"
+                         + ") group by 1  PARTITIONED by day ")
+                     .setExpectedDataSource("foo1")
+                     .setQueryContext(localContext)
+                     .setExpectedRowSignature(rowSignature)
+                     .setExpectedSegment(ImmutableSet.of(SegmentId.of(
+                         "foo1",
+                         Intervals.of("2016-06-27/P1D"),
+                         "test",
+                         0
+                     )))
+                     .setExpectedResultRows(ImmutableList.of(new 
Object[]{1466985600000L, 20L}))
+                     .setExpectedWorkerCount(
+                         ImmutableMap.of(
+                             0, 3
+                         ))
+                     .verifyResults();
+  }
+
   @Test
   public void testInsertArraysAutoType() throws IOException
   {
@@ -1120,7 +1220,6 @@ public class MSQInsertTest extends MSQTestBase
                      .setExpectedDataSource("foo1")
                      .setExpectedRowSignature(rowSignature)
                      .verifyResults();
-
   }
 
   @Nonnull
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/StageDefinitionTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/StageDefinitionTest.java
index 793c4293f1..20110b6332 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/StageDefinitionTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/StageDefinitionTest.java
@@ -26,7 +26,6 @@ import org.apache.druid.frame.key.ClusterBy;
 import org.apache.druid.frame.key.KeyColumn;
 import org.apache.druid.frame.key.KeyOrder;
 import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.msq.exec.Limits;
 import org.apache.druid.msq.input.stage.StageInputSpec;
 import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory;
 import org.apache.druid.msq.statistics.ClusterByStatisticsCollectorImpl;
@@ -57,8 +56,7 @@ public class StageDefinitionTest
         RowSignature.empty(),
         null,
         0,
-        false,
-        Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
+        false
     );
 
     Assert.assertThrows(ISE.class, () -> 
stageDefinition.generatePartitionBoundariesForShuffle(null));
@@ -79,8 +77,7 @@ public class StageDefinitionTest
             false
         ),
         1,
-        false,
-        Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
+        false
     );
 
     Assert.assertThrows(ISE.class, () -> 
stageDefinition.generatePartitionBoundariesForShuffle(null));
@@ -101,8 +98,7 @@ public class StageDefinitionTest
             false
         ),
         1,
-        false,
-        Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
+        false
     );
 
     Assert.assertThrows(
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/BaseControllerQueryKernelTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/BaseControllerQueryKernelTest.java
index 1e0177ae12..6ae18dda1e 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/BaseControllerQueryKernelTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/BaseControllerQueryKernelTest.java
@@ -27,6 +27,7 @@ import org.apache.druid.frame.key.RowKey;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.msq.exec.Limits;
 import org.apache.druid.msq.indexing.error.MSQFault;
 import org.apache.druid.msq.indexing.error.UnknownFault;
 import org.apache.druid.msq.input.InputSpecSlicerFactory;
@@ -232,7 +233,8 @@ public class BaseControllerQueryKernelTest extends 
InitializedNullHandlingTest
       return mapStageIdsToStageNumbers(
           controllerQueryKernel.createAndGetNewStageIds(
               inputSlicerFactory,
-              WorkerAssignmentStrategy.MAX
+              WorkerAssignmentStrategy.MAX,
+              Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
           )
       );
     }
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java
index 6a97e0b0bc..f59ae121d4 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java
@@ -25,6 +25,7 @@ import it.unimi.dsi.fastutil.ints.IntSet;
 import it.unimi.dsi.fastutil.longs.LongArrayList;
 import it.unimi.dsi.fastutil.longs.LongList;
 import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.msq.exec.Limits;
 import org.apache.druid.msq.input.InputSlice;
 import org.apache.druid.msq.input.InputSpec;
 import org.apache.druid.msq.input.InputSpecSlicer;
@@ -35,12 +36,19 @@ import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
 import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+
 public class WorkerInputsTest
 {
   private static final String QUERY_ID = "myQuery";
@@ -59,7 +67,8 @@ public class WorkerInputsTest
         stageDef,
         Int2IntMaps.EMPTY_MAP,
         new TestInputSpecSlicer(true),
-        WorkerAssignmentStrategy.MAX
+        WorkerAssignmentStrategy.MAX,
+        Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
     );
 
     Assert.assertEquals(
@@ -87,7 +96,8 @@ public class WorkerInputsTest
         stageDef,
         Int2IntMaps.EMPTY_MAP,
         new TestInputSpecSlicer(true),
-        WorkerAssignmentStrategy.MAX
+        WorkerAssignmentStrategy.MAX,
+        Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
     );
 
     Assert.assertEquals(
@@ -115,7 +125,8 @@ public class WorkerInputsTest
         stageDef,
         Int2IntMaps.EMPTY_MAP,
         new TestInputSpecSlicer(true),
-        WorkerAssignmentStrategy.AUTO
+        WorkerAssignmentStrategy.AUTO,
+        Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
     );
 
     Assert.assertEquals(
@@ -140,7 +151,8 @@ public class WorkerInputsTest
         stageDef,
         Int2IntMaps.EMPTY_MAP,
         new TestInputSpecSlicer(true),
-        WorkerAssignmentStrategy.AUTO
+        WorkerAssignmentStrategy.AUTO,
+        Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
     );
 
     Assert.assertEquals(
@@ -166,7 +178,8 @@ public class WorkerInputsTest
         stageDef,
         Int2IntMaps.EMPTY_MAP,
         new TestInputSpecSlicer(true),
-        WorkerAssignmentStrategy.AUTO
+        WorkerAssignmentStrategy.AUTO,
+        Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
     );
 
     Assert.assertEquals(
@@ -191,7 +204,8 @@ public class WorkerInputsTest
         stageDef,
         Int2IntMaps.EMPTY_MAP,
         new TestInputSpecSlicer(true),
-        WorkerAssignmentStrategy.AUTO
+        WorkerAssignmentStrategy.AUTO,
+        Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
     );
 
     Assert.assertEquals(
@@ -216,7 +230,8 @@ public class WorkerInputsTest
         stageDef,
         Int2IntMaps.EMPTY_MAP,
         new TestInputSpecSlicer(true),
-        WorkerAssignmentStrategy.AUTO
+        WorkerAssignmentStrategy.AUTO,
+        Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
     );
 
     Assert.assertEquals(
@@ -242,7 +257,8 @@ public class WorkerInputsTest
         stageDef,
         Int2IntMaps.EMPTY_MAP,
         new TestInputSpecSlicer(true),
-        WorkerAssignmentStrategy.AUTO
+        WorkerAssignmentStrategy.AUTO,
+        Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
     );
 
     Assert.assertEquals(
@@ -274,7 +290,8 @@ public class WorkerInputsTest
         stageDef,
         Int2IntMaps.EMPTY_MAP,
         new TestInputSpecSlicer(true),
-        WorkerAssignmentStrategy.AUTO
+        WorkerAssignmentStrategy.AUTO,
+        Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
     );
 
     Assert.assertEquals(
@@ -288,6 +305,127 @@ public class WorkerInputsTest
     );
   }
 
+  @Test
+  public void test_max_shouldAlwaysSplitStatic()
+  {
+    TestInputSpec inputSpecToSplit = new TestInputSpec(4_000_000_000L);
+    final StageDefinition stageDef =
+        StageDefinition.builder(0)
+                       .inputs(inputSpecToSplit)
+                       .maxWorkerCount(3)
+                       .processorFactory(new 
OffsetLimitFrameProcessorFactory(0, 0L))
+                       .build(QUERY_ID);
+
+    TestInputSpecSlicer testInputSpecSlicer = spy(new 
TestInputSpecSlicer(true));
+
+    final WorkerInputs inputs = WorkerInputs.create(
+        stageDef,
+        Int2IntMaps.EMPTY_MAP,
+        testInputSpecSlicer,
+        WorkerAssignmentStrategy.MAX,
+        Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
+    );
+
+    Mockito.verify(testInputSpecSlicer, 
times(0)).canSliceDynamic(inputSpecToSplit);
+    Mockito.verify(testInputSpecSlicer, times(1)).sliceStatic(any(), anyInt());
+
+    Assert.assertEquals(
+        ImmutableMap.<Integer, List<InputSlice>>builder()
+                    .put(
+                        0,
+                        Collections.singletonList(new 
TestInputSlice(4_000_000_000L))
+                    )
+                    .put(
+                        1,
+                        Collections.singletonList(new TestInputSlice())
+                    )
+                    .put(
+                        2,
+                        Collections.singletonList(new TestInputSlice())
+                    )
+                    .build(),
+        inputs.assignmentsMap()
+    );
+
+  }
+
+  @Test
+  public void test_auto_shouldSplitDynamicIfPossible()
+  {
+    TestInputSpec inputSpecToSplit = new TestInputSpec(1_000_000_000L, 
1_000_000_000L, 1_000_000_000L);
+    final StageDefinition stageDef =
+        StageDefinition.builder(0)
+                       .inputs(inputSpecToSplit)
+                       .maxWorkerCount(3)
+                       .processorFactory(new 
OffsetLimitFrameProcessorFactory(0, 0L))
+                       .build(QUERY_ID);
+
+    TestInputSpecSlicer testInputSpecSlicer = spy(new 
TestInputSpecSlicer(true));
+
+    final WorkerInputs inputs = WorkerInputs.create(
+        stageDef,
+        Int2IntMaps.EMPTY_MAP,
+        testInputSpecSlicer,
+        WorkerAssignmentStrategy.AUTO,
+        100
+    );
+
+    Mockito.verify(testInputSpecSlicer, 
times(1)).canSliceDynamic(inputSpecToSplit);
+    Mockito.verify(testInputSpecSlicer, times(1)).sliceDynamic(any(), 
anyInt(), anyInt(), anyLong());
+
+    Assert.assertEquals(
+        ImmutableMap.<Integer, List<InputSlice>>builder()
+                    .put(
+                        0,
+                        Collections.singletonList(new 
TestInputSlice(1_000_000_000L))
+                    )
+                    .put(
+                        1,
+                        Collections.singletonList(new 
TestInputSlice(1_000_000_000L))
+                    )
+                    .put(
+                        2,
+                        Collections.singletonList(new 
TestInputSlice(1_000_000_000L))
+                    )
+                    .build(),
+        inputs.assignmentsMap()
+    );
+  }
+
+  @Test
+  public void test_auto_shouldUseLeastWorkersPossible()
+  {
+    TestInputSpec inputSpecToSplit = new TestInputSpec(1_000_000_000L, 
1_000_000_000L, 1_000_000_000L);
+    final StageDefinition stageDef =
+        StageDefinition.builder(0)
+                       .inputs(inputSpecToSplit)
+                       .maxWorkerCount(3)
+                       .processorFactory(new 
OffsetLimitFrameProcessorFactory(0, 0L))
+                       .build(QUERY_ID);
+
+    TestInputSpecSlicer testInputSpecSlicer = spy(new 
TestInputSpecSlicer(true));
+
+    final WorkerInputs inputs = WorkerInputs.create(
+        stageDef,
+        Int2IntMaps.EMPTY_MAP,
+        testInputSpecSlicer,
+        WorkerAssignmentStrategy.AUTO,
+        Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
+    );
+
+    Assert.assertEquals(
+        ImmutableMap.<Integer, List<InputSlice>>builder()
+                    .put(
+                        0,
+                        Collections.singletonList(new 
TestInputSlice(1_000_000_000L, 1_000_000_000L, 1_000_000_000L))
+                    )
+                    .build(),
+        inputs.assignmentsMap()
+    );
+
+    Mockito.verify(testInputSpecSlicer, 
times(1)).canSliceDynamic(inputSpecToSplit);
+  }
+
   @Test
   public void testEquals()
   {
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index d768c73d80..e44d1974b8 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -225,6 +225,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
                   .put(QueryContexts.CTX_SQL_QUERY_ID, "test-query")
                   .put(QueryContexts.FINALIZE_KEY, true)
                   .put(QueryContexts.CTX_SQL_STRINGIFY_ARRAYS, false)
+                  .put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 2)
                   .put(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, 0)
                   .build();
 
@@ -799,6 +800,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
     protected Matcher<Throwable> expectedExecutionErrorMatcher = null;
     protected MSQFault expectedMSQFault = null;
     protected Class<? extends MSQFault> expectedMSQFaultClass = null;
+    protected Map<Integer, Integer> expectedStageVsWorkerCount = new 
HashMap<>();
     protected final Map<Integer, Map<Integer, Map<String, 
CounterSnapshotMatcher>>>
         expectedStageWorkerChannelToCounters = new HashMap<>();
 
@@ -893,6 +895,12 @@ public class MSQTestBase extends BaseCalciteQueryTest
       return asBuilder();
     }
 
+    public Builder setExpectedWorkerCount(Map<Integer, Integer> 
stageVsWorkerCount)
+    {
+      this.expectedStageVsWorkerCount = stageVsWorkerCount;
+      return asBuilder();
+    }
+
     public Builder setExpectedSegmentGenerationProgressCountersForStageWorker(
         CounterSnapshotMatcher counterSnapshot,
         int stage,
@@ -925,6 +933,14 @@ public class MSQTestBase extends BaseCalciteQueryTest
       MatcherAssert.assertThat(e, expectedValidationErrorMatcher);
     }
 
+    protected void verifyWorkerCount(CounterSnapshotsTree counterSnapshotsTree)
+    {
+      Map<Integer, Map<Integer, CounterSnapshots>> counterMap = 
counterSnapshotsTree.copyMap();
+      for (Map.Entry<Integer, Integer> stageWorkerCount : 
expectedStageVsWorkerCount.entrySet()) {
+        Assert.assertEquals(stageWorkerCount.getValue().intValue(), 
counterMap.get(stageWorkerCount.getKey()).size());
+      }
+    }
+
     protected void verifyCounters(CounterSnapshotsTree counterSnapshotsTree)
     {
       Assert.assertNotNull(counterSnapshotsTree);
@@ -1061,6 +1077,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
           return;
         }
         MSQTaskReportPayload reportPayload = getPayloadOrThrow(controllerId);
+        verifyWorkerCount(reportPayload.getCounters());
         verifyCounters(reportPayload.getCounters());
 
         MSQSpec foundSpec = 
indexingServiceClient.getQuerySpecForTask(controllerId);
@@ -1270,6 +1287,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
 
         MSQTaskReportPayload payload = getPayloadOrThrow(controllerId);
         verifyCounters(payload.getCounters());
+        verifyWorkerCount(payload.getCounters());
 
         if (payload.getStatus().getErrorReport() != null) {
           throw new ISE("Query %s failed due to %s", sql, 
payload.getStatus().getErrorReport().toString());
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
index 73cd900a5b..58ee01f008 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
@@ -63,10 +63,12 @@ import java.util.stream.Collectors;
 public class MSQTestControllerContext implements ControllerContext
 {
   private static final Logger log = new Logger(MSQTestControllerContext.class);
+  private static final int NUM_WORKERS = 4;
   private final TaskActionClient taskActionClient;
   private final Map<String, Worker> inMemoryWorkers = new HashMap<>();
   private final ConcurrentMap<String, TaskStatus> statusMap = new 
ConcurrentHashMap<>();
-  private final ListeningExecutorService executor = 
MoreExecutors.listeningDecorator(Execs.singleThreaded(
+  private final ListeningExecutorService executor = 
MoreExecutors.listeningDecorator(Execs.multiThreaded(
+      NUM_WORKERS,
       "MultiStageQuery-test-controller-client"));
   private final CoordinatorClient coordinatorClient;
   private final DruidNode node = new DruidNode(
diff --git 
a/extensions-core/multi-stage-query/src/test/resources/multipleFiles/wikipedia-sampled-1.json
 
b/extensions-core/multi-stage-query/src/test/resources/multipleFiles/wikipedia-sampled-1.json
new file mode 100644
index 0000000000..26002ac687
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/test/resources/multipleFiles/wikipedia-sampled-1.json
@@ -0,0 +1,7 @@
+{"isRobot":true,"channel":"#sv.wikipedia","timestamp":"2016-06-27T00:00:11.080Z","flags":"NB","isUnpatrolled":false,"page":"Salo
 
Toraut","diffUrl":"https://sv.wikipedia.org/w/index.php?oldid=36099284&rcid=89369918","added":31,"comment":"Botskapande
 Indonesien 
omdirigering","commentLength":35,"isNew":true,"isMinor":false,"delta":31,"isAnonymous":false,"user":"Lsjbot","deltaBucket":0.0,"deleted":0,"namespace":"Main"}
+{"isRobot":false,"channel":"#en.wikipedia","cityName":"Buenos 
Aires","timestamp":"2016-06-27T00:00:34.959Z","flags":"","isUnpatrolled":false,"page":"Bailando
 
2015","countryName":"Argentina","regionIsoCode":"C","diffUrl":"https://en.wikipedia.org/w/index.php?diff=727144213&oldid=727144184","added":2,"metroCode":null,"comment":"/*
 Scores 
*/","commentLength":12,"isNew":false,"isMinor":false,"delta":2,"countryIsoCode":"AR","isAnonymous":true,"user":"181.230.118.178","regionName":"Buenos
 Aire [...]
+{"isRobot":false,"channel":"#en.wikipedia","timestamp":"2016-06-27T00:00:36.027Z","flags":"M","isUnpatrolled":false,"page":"Richie
 Rich's Christmas 
Wish","diffUrl":"https://en.wikipedia.org/w/index.php?diff=727144214&oldid=716794477","added":0,"comment":"standard
 term is [[title 
character]]","commentLength":36,"isNew":false,"isMinor":true,"delta":-2,"isAnonymous":false,"user":"JasonAQuest","deltaBucket":-100.0,"deleted":2,"namespace":"Main"}
+{"isRobot":true,"channel":"#pl.wikipedia","timestamp":"2016-06-27T00:00:58.599Z","flags":"NB","isUnpatrolled":false,"page":"Kategoria:Dyskusje
 nad usunięciem artykułu zakończone bez konsensusu − lipiec 
2016","diffUrl":"https://pl.wikipedia.org/w/index.php?oldid=46204477&rcid=68522573","added":270,"comment":"utworzenie
 
kategorii","commentLength":20,"isNew":true,"isMinor":false,"delta":270,"isAnonymous":false,"user":"Beau.bot","deltaBucket":200.0,"deleted":0,"namespace":"Kategoria"}
+{"isRobot":true,"channel":"#sh.wikipedia","timestamp":"2016-06-27T00:01:03.685Z","flags":"MB","isUnpatrolled":false,"page":"El
 Terco, 
Bachíniva","diffUrl":"https://sh.wikipedia.org/w/index.php?diff=29388566&oldid=29282572&rcid=34712923","added":0,"comment":"Bot:
 Automatska zamjena teksta  (-[[Administrativna podjela Meksika|Admin]] 
+[[Administrativna podjela 
Meksika|Admi]])","commentLength":118,"isNew":false,"isMinor":true,"delta":-1,"isAnonymous":false,"user":"Kolega2357","deltaBucket":
 [...]
+{"isRobot":true,"channel":"#ceb.wikipedia","timestamp":"2016-06-27T00:01:07.347Z","flags":"NB","isUnpatrolled":false,"page":"Neqerssuaq","diffUrl":"https://ceb.wikipedia.org/w/index.php?oldid=9563239&rcid=36193146","added":4150,"comment":"Paghimo
 ni bot 
Greenland","commentLength":24,"isNew":true,"isMinor":false,"delta":4150,"isAnonymous":false,"user":"Lsjbot","deltaBucket":4100.0,"deleted":0,"namespace":"Main"}
+{"isRobot":false,"channel":"#es.wikipedia","cityName":null,"timestamp":"2016-06-27T00:01:14.343Z","flags":"","isUnpatrolled":false,"page":"Sumo
 
(banda)","countryName":"Argentina","regionIsoCode":null,"diffUrl":"https://es.wikipedia.org/w/index.php?diff=91937261&oldid=91937229","added":0,"metroCode":null,"comment":"/*
 Línea de tiempo 
*/","commentLength":21,"isNew":false,"isMinor":false,"delta":-173,"countryIsoCode":"AR","isAnonymous":true,"user":"181.110.165.189","regionName":null,"deltaB
 [...]
diff --git 
a/extensions-core/multi-stage-query/src/test/resources/multipleFiles/wikipedia-sampled-2.json
 
b/extensions-core/multi-stage-query/src/test/resources/multipleFiles/wikipedia-sampled-2.json
new file mode 100644
index 0000000000..ae560bf8f2
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/test/resources/multipleFiles/wikipedia-sampled-2.json
@@ -0,0 +1,7 @@
+{"isRobot":false,"channel":"#en.wikipedia","timestamp":"2016-06-27T00:01:59.599Z","flags":"","isUnpatrolled":false,"page":"Panama
 
Canal","diffUrl":"https://en.wikipedia.org/w/index.php?diff=727144327&oldid=727144068","added":496,"comment":"expanding
 lead + missing 
RS","commentLength":27,"isNew":false,"isMinor":false,"delta":496,"isAnonymous":false,"user":"Mariordo","deltaBucket":400.0,"deleted":0,"namespace":"Main"}
+{"isRobot":false,"channel":"#ru.wikipedia","timestamp":"2016-06-27T00:02:09.238Z","flags":"","isUnpatrolled":false,"page":"Википедия:Опросы/Унификация
 шаблонов «Не 
переведено»","diffUrl":"https://ru.wikipedia.org/w/index.php?diff=79213888&oldid=79213880","added":196,"comment":"/*
 Нет 
*/","commentLength":9,"isNew":false,"isMinor":false,"delta":196,"isAnonymous":false,"user":"Wanderer777","deltaBucket":100.0,"deleted":0,"namespace":"Википедия"}
+{"isRobot":true,"channel":"#sh.wikipedia","timestamp":"2016-06-27T00:02:10.857Z","flags":"MB","isUnpatrolled":false,"page":"Hermanos
 Díaz, 
Ascensión","diffUrl":"https://sh.wikipedia.org/w/index.php?diff=29388624&oldid=29282630&rcid=34712981","added":0,"comment":"Bot:
 Automatska zamjena teksta  (-[[Administrativna podjela Meksika|Admin]] 
+[[Administrativna podjela 
Meksika|Admi]])","commentLength":118,"isNew":false,"isMinor":true,"delta":-1,"isAnonymous":false,"user":"Kolega2357","deltaBuc
 [...]
+{"isRobot":false,"channel":"#es.wikipedia","timestamp":"2016-06-27T01:02:13.153Z","flags":"","isUnpatrolled":false,"page":"Clasificación
 para la Eurocopa Sub-21 de 
2017","diffUrl":"https://es.wikipedia.org/w/index.php?diff=91937277&oldid=91937272","added":4,"comment":"/*
 Máximos Asistentes 
*/","commentLength":24,"isNew":false,"isMinor":false,"delta":4,"isAnonymous":false,"user":"Guly600","deltaBucket":0.0,"deleted":0,"namespace":"Main"}
+{"isRobot":false,"channel":"#zh.wikipedia","timestamp":"2016-06-27T01:02:13.815Z","flags":"","isUnpatrolled":false,"page":"中共十八大以来的反腐败工作","diffUrl":"https://zh.wikipedia.org/w/index.php?diff=40605390&oldid=40605381","added":18,"comment":"/*
 违反中共中央八项规定官员(副部级及以上) 
*/","commentLength":26,"isNew":false,"isMinor":false,"delta":18,"isAnonymous":false,"user":"2001:DA8:207:E132:94DC:BA03:DFDF:8F9F","deltaBucket":0.0,"deleted":0,"namespace":"Main"}
+{"isRobot":true,"channel":"#sh.wikipedia","timestamp":"2016-06-27T01:02:15.952Z","flags":"MB","isUnpatrolled":false,"page":"El
 Sicomoro, 
Ascensión","diffUrl":"https://sh.wikipedia.org/w/index.php?diff=29388628&oldid=29282634&rcid=34712985","added":0,"comment":"Bot:
 Automatska zamjena teksta  (-[[Administrativna podjela Meksika|Admin]] 
+[[Administrativna podjela 
Meksika|Admi]])","commentLength":118,"isNew":false,"isMinor":true,"delta":-1,"isAnonymous":false,"user":"Kolega2357","deltaBucke
 [...]
+{"isRobot":false,"channel":"#id.wikipedia","timestamp":"2016-06-27T01:02:20.008Z","flags":"!","isUnpatrolled":true,"page":"Ibnu
 
Sina","diffUrl":"https://id.wikipedia.org/w/index.php?diff=11687177&oldid=11444059&rcid=20812462","added":106,"comment":"gambar","commentLength":6,"isNew":false,"isMinor":false,"delta":106,"isAnonymous":false,"user":"Ftihikam","deltaBucket":100.0,"deleted":0,"namespace":"Main"}
diff --git 
a/extensions-core/multi-stage-query/src/test/resources/multipleFiles/wikipedia-sampled-3.json
 
b/extensions-core/multi-stage-query/src/test/resources/multipleFiles/wikipedia-sampled-3.json
new file mode 100644
index 0000000000..6604b51fb1
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/test/resources/multipleFiles/wikipedia-sampled-3.json
@@ -0,0 +1,6 @@
+{"isRobot":false,"channel":"#pt.wikipedia","timestamp":"2016-06-27T02:02:22.868Z","flags":"N","isUnpatrolled":false,"page":"Dobromir
 
Zhechev","diffUrl":"https://pt.wikipedia.org/w/index.php?oldid=46012430&rcid=67145794","added":1926,"comment":"Novo","commentLength":4,"isNew":true,"isMinor":false,"delta":1926,"isAnonymous":false,"user":"Ceresta","deltaBucket":1900.0,"deleted":0,"namespace":"Main"}
+{"isRobot":true,"channel":"#de.wikipedia","timestamp":"2016-06-27T02:02:29.228Z","flags":"B","isUnpatrolled":false,"page":"Benutzer
 
Diskussion:Squasher/Archiv/2016","diffUrl":"https://de.wikipedia.org/w/index.php?diff=155658493&oldid=155085489","added":2560,"comment":"1
  Abschnitt aus [[Benutzer Diskussion:Squasher]] 
archiviert","commentLength":60,"isNew":false,"isMinor":false,"delta":2560,"isAnonymous":false,"user":"TaxonBot","deltaBucket":2500.0,"deleted":0,"namespace":"Benutzer
 Diskussion"}
+{"isRobot":true,"channel":"#sh.wikipedia","timestamp":"2016-06-27T02:02:50.958Z","flags":"MB","isUnpatrolled":false,"page":"Trinidad
 Jiménez G., Benemérito de las 
Américas","diffUrl":"https://sh.wikipedia.org/w/index.php?diff=29388657&oldid=29282663&rcid=34713014","added":0,"comment":"Bot:
 Automatska zamjena teksta  (-[[Administrativna podjela Meksika|Admin]] 
+[[Administrativna podjela 
Meksika|Admi]])","commentLength":118,"isNew":false,"isMinor":true,"delta":-1,"isAnonymous":false,"user"
 [...]
+{"isRobot":false,"channel":"#zh.wikipedia","timestamp":"2016-06-27T02:02:59.521Z","flags":"N","isUnpatrolled":false,"page":"Wikipedia:頁面存廢討論/記錄/2016/06/27","diffUrl":"https://zh.wikipedia.org/w/index.php?oldid=40605393&rcid=62748523","added":1986,"comment":"添加[[李洛能八大弟子]]","commentLength":13,"isNew":true,"isMinor":false,"delta":1986,"isAnonymous":false,"user":"Tigerzeng","deltaBucket":1900.0,"deleted":0,"namespace":"Wikipedia"}
+{"isRobot":false,"channel":"#en.wikipedia","timestamp":"2016-06-27T02:03:01.090Z","flags":"","isUnpatrolled":false,"page":"File:Paint.net
 4.0.6 
screenshot.png","diffUrl":"https://en.wikipedia.org/w/index.php?diff=727144426&oldid=713167833","added":0,"comment":"/*
 Summary 
*/","commentLength":13,"isNew":false,"isMinor":false,"delta":-463,"isAnonymous":false,"user":"Calvin
 Hogg","deltaBucket":-500.0,"deleted":463,"namespace":"File"}
+{"isRobot":true,"channel":"#de.wikipedia","timestamp":"2016-06-27T02:03:05.240Z","flags":"B","isUnpatrolled":false,"page":"Benutzer
 
Diskussion:HerrSonderbar","diffUrl":"https://de.wikipedia.org/w/index.php?diff=155658496&oldid=155657380","added":364,"comment":"Neuer
 Abschnitt /* Benachrichtigung über inaktive Mentees am 27. 6. 2016 
*/","commentLength":75,"isNew":false,"isMinor":false,"delta":364,"isAnonymous":false,"user":"GiftBot","deltaBucket":300.0,"deleted":0,"namespace":"Benutzer
 Di [...]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to