gianm commented on code in PR #13955:
URL: https://github.com/apache/druid/pull/13955#discussion_r1149865250
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicer.java:
##########
@@ -59,91 +55,193 @@ public boolean canSliceDynamic(InputSpec inputSpec)
public List<InputSlice> sliceStatic(InputSpec inputSpec, int maxNumSlices)
{
final ExternalInputSpec externalInputSpec = (ExternalInputSpec) inputSpec;
- final InputSource inputSource = externalInputSpec.getInputSource();
- final InputFormat inputFormat = externalInputSpec.getInputFormat();
- final RowSignature signature = externalInputSpec.getSignature();
-
- // Worker number -> input source for that worker.
- final List<List<InputSource>> workerInputSourcess;
-
- // Figure out input splits for each worker.
- if (inputSource.isSplittable()) {
- //noinspection unchecked
- final SplittableInputSource<Object> splittableInputSource =
(SplittableInputSource<Object>) inputSource;
-
- try {
- workerInputSourcess = SlicerUtils.makeSlices(
- splittableInputSource.createSplits(inputFormat,
FilePerSplitHintSpec.INSTANCE)
- .map(splittableInputSource::withSplit)
- .iterator(),
- maxNumSlices
- );
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
- } else {
- workerInputSourcess =
Collections.singletonList(Collections.singletonList(inputSource));
- }
- // Sanity check. It is a bug in this method if this exception is ever
thrown.
- if (workerInputSourcess.size() > maxNumSlices) {
- throw new ISE("Generated too many slices [%d > %d]",
workerInputSourcess.size(), maxNumSlices);
+ if (externalInputSpec.getInputSource().isSplittable()) {
+ return sliceSplittableInputSource(
+ externalInputSpec,
+ new StaticSplitHintSpec(maxNumSlices),
+ maxNumSlices
+ );
+ } else {
+ return sliceUnsplittableInputSource(externalInputSpec);
}
-
- return IntStream.range(0, maxNumSlices)
- .mapToObj(
- workerNumber -> {
- final List<InputSource> workerInputSources;
-
- if (workerNumber < workerInputSourcess.size()) {
- workerInputSources =
workerInputSourcess.get(workerNumber);
- } else {
- workerInputSources = Collections.emptyList();
- }
-
- if (workerInputSources.isEmpty()) {
- return NilInputSlice.INSTANCE;
- } else {
- return new ExternalInputSlice(workerInputSources,
inputFormat, signature);
- }
- }
- )
- .collect(Collectors.toList());
}
@Override
public List<InputSlice> sliceDynamic(
- InputSpec inputSpec,
- int maxNumSlices,
- int maxFilesPerSlice,
- long maxBytesPerSlice
+ final InputSpec inputSpec,
+ final int maxNumSlices,
+ final int maxFilesPerSlice,
+ final long maxBytesPerSlice
)
{
final ExternalInputSpec externalInputSpec = (ExternalInputSpec) inputSpec;
- if (!externalInputSpec.getInputSource().isSplittable()) {
- return sliceStatic(inputSpec, 1);
+ if (externalInputSpec.getInputSource().isSplittable()) {
+ return sliceSplittableInputSource(
+ externalInputSpec,
+ new DynamicSplitHintSpec(maxNumSlices, maxFilesPerSlice,
maxBytesPerSlice),
+ maxNumSlices
+ );
+ } else {
+ return sliceUnsplittableInputSource(externalInputSpec);
}
+ }
- final SplittableInputSource<?> inputSource = (SplittableInputSource<?>)
externalInputSpec.getInputSource();
- final MaxSizeSplitHintSpec maxSizeSplitHintSpec = new MaxSizeSplitHintSpec(
- new HumanReadableBytes(maxBytesPerSlice),
- maxFilesPerSlice
+ /**
+ * "Slice" an unsplittable input source into a single slice.
+ */
+ private static List<InputSlice> sliceUnsplittableInputSource(final
ExternalInputSpec inputSpec)
+ {
+ return Collections.singletonList(
+ new ExternalInputSlice(
+ Collections.singletonList(inputSpec.getInputSource()),
+ inputSpec.getInputFormat(),
+ inputSpec.getSignature()
+ )
);
+ }
+
+ /**
+ * Slice a {@link SplittableInputSource} using a {@link SplitHintSpec}.
+ */
+ private static List<InputSlice> sliceSplittableInputSource(
+ final ExternalInputSpec inputSpec,
+ final SplitHintSpec splitHintSpec,
+ final int maxNumSlices
Review Comment:
> Not clear how this can be done per input source. The number of slices
would seem to be a constraint based on total resources and total file count.
This is a good point. I suppose there's an unstated assumption that there
will not be more than one splittable input source per stage. That's true of
everything I can think of. So I think the question you raise, while
interesting, is currently academic.
If we need to extend this to handle multiple splittable input sources per
stage, we'll definitely need to refactor somewhat. I'll add the following
comment to `InputSpecSlicer#sliceDynamic` mentioning that. (It's really an
issue dynamic slicing generally.)
```
* The design of this method assumes that the ideal number of {@link
InputSlice} can be determined by looking at
* just one {@link InputSpec} at a time. This makes sense today, since
there are no situations where a
* {@link org.apache.druid.msq.kernel.StageDefinition} would be created
with two {@link InputSpec} other than
* {@link org.apache.druid.msq.input.stage.StageInputSpec} (which is not
dynamically splittable, so would not
* use this method anyway). If this changes in the future, we'll want to
revisit the design of this method.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]