gianm commented on code in PR #13955:
URL: https://github.com/apache/druid/pull/13955#discussion_r1149865250


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicer.java:
##########
@@ -59,91 +55,193 @@ public boolean canSliceDynamic(InputSpec inputSpec)
   public List<InputSlice> sliceStatic(InputSpec inputSpec, int maxNumSlices)
   {
     final ExternalInputSpec externalInputSpec = (ExternalInputSpec) inputSpec;
-    final InputSource inputSource = externalInputSpec.getInputSource();
-    final InputFormat inputFormat = externalInputSpec.getInputFormat();
-    final RowSignature signature = externalInputSpec.getSignature();
-
-    // Worker number -> input source for that worker.
-    final List<List<InputSource>> workerInputSourcess;
-
-    // Figure out input splits for each worker.
-    if (inputSource.isSplittable()) {
-      //noinspection unchecked
-      final SplittableInputSource<Object> splittableInputSource = 
(SplittableInputSource<Object>) inputSource;
-
-      try {
-        workerInputSourcess = SlicerUtils.makeSlices(
-            splittableInputSource.createSplits(inputFormat, 
FilePerSplitHintSpec.INSTANCE)
-                                 .map(splittableInputSource::withSplit)
-                                 .iterator(),
-            maxNumSlices
-        );
-      }
-      catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    } else {
-      workerInputSourcess = 
Collections.singletonList(Collections.singletonList(inputSource));
-    }
 
-    // Sanity check. It is a bug in this method if this exception is ever 
thrown.
-    if (workerInputSourcess.size() > maxNumSlices) {
-      throw new ISE("Generated too many slices [%d > %d]", 
workerInputSourcess.size(), maxNumSlices);
+    if (externalInputSpec.getInputSource().isSplittable()) {
+      return sliceSplittableInputSource(
+          externalInputSpec,
+          new StaticSplitHintSpec(maxNumSlices),
+          maxNumSlices
+      );
+    } else {
+      return sliceUnsplittableInputSource(externalInputSpec);
     }
-
-    return IntStream.range(0, maxNumSlices)
-                    .mapToObj(
-                        workerNumber -> {
-                          final List<InputSource> workerInputSources;
-
-                          if (workerNumber < workerInputSourcess.size()) {
-                            workerInputSources = 
workerInputSourcess.get(workerNumber);
-                          } else {
-                            workerInputSources = Collections.emptyList();
-                          }
-
-                          if (workerInputSources.isEmpty()) {
-                            return NilInputSlice.INSTANCE;
-                          } else {
-                            return new ExternalInputSlice(workerInputSources, 
inputFormat, signature);
-                          }
-                        }
-                    )
-                    .collect(Collectors.toList());
   }
 
   @Override
   public List<InputSlice> sliceDynamic(
-      InputSpec inputSpec,
-      int maxNumSlices,
-      int maxFilesPerSlice,
-      long maxBytesPerSlice
+      final InputSpec inputSpec,
+      final int maxNumSlices,
+      final int maxFilesPerSlice,
+      final long maxBytesPerSlice
   )
   {
     final ExternalInputSpec externalInputSpec = (ExternalInputSpec) inputSpec;
 
-    if (!externalInputSpec.getInputSource().isSplittable()) {
-      return sliceStatic(inputSpec, 1);
+    if (externalInputSpec.getInputSource().isSplittable()) {
+      return sliceSplittableInputSource(
+          externalInputSpec,
+          new DynamicSplitHintSpec(maxNumSlices, maxFilesPerSlice, 
maxBytesPerSlice),
+          maxNumSlices
+      );
+    } else {
+      return sliceUnsplittableInputSource(externalInputSpec);
     }
+  }
 
-    final SplittableInputSource<?> inputSource = (SplittableInputSource<?>) 
externalInputSpec.getInputSource();
-    final MaxSizeSplitHintSpec maxSizeSplitHintSpec = new MaxSizeSplitHintSpec(
-        new HumanReadableBytes(maxBytesPerSlice),
-        maxFilesPerSlice
+  /**
+   * "Slice" an unsplittable input source into a single slice.
+   */
+  private static List<InputSlice> sliceUnsplittableInputSource(final 
ExternalInputSpec inputSpec)
+  {
+    return Collections.singletonList(
+        new ExternalInputSlice(
+            Collections.singletonList(inputSpec.getInputSource()),
+            inputSpec.getInputFormat(),
+            inputSpec.getSignature()
+        )
     );
+  }
+
+  /**
+   * Slice a {@link SplittableInputSource} using a {@link SplitHintSpec}.
+   */
+  private static List<InputSlice> sliceSplittableInputSource(
+      final ExternalInputSpec inputSpec,
+      final SplitHintSpec splitHintSpec,
+      final int maxNumSlices

Review Comment:
   > Not clear how this can be done per input source. The number of slices 
would seem to be a constraint based on total resources and total file count.
   
   This is a good point. I suppose there's an unstated assumption that there 
will not be more than one splittable input source per stage. That's true of 
everything I can think of. So I think the question you raise, while 
interesting, is currently academic.
   
   If we need to extend this to handle multiple splittable input sources per 
stage, we'll definitely need to refactor somewhat. I'll add the following 
comment to `InputSpecSlicer#sliceDynamic` mentioning that. (It's really an 
issue dynamic slicing generally.)
   
   ```
      * The design of this method assumes that the ideal number of {@link 
InputSlice} can be determined by looking at
      * just one {@link InputSpec} at a time. This makes sense today, since 
there are no situations where a
      * {@link org.apache.druid.msq.kernel.StageDefinition} would be created 
with two {@link InputSpec} other than
      * {@link org.apache.druid.msq.input.stage.StageInputSpec} (which is not 
dynamically splittable, so would not
      * use this method anyway). If this changes in the future, we'll want to 
revisit the design of this method.
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to