gianm commented on code in PR #13955:
URL: https://github.com/apache/druid/pull/13955#discussion_r1149869671
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicer.java:
##########
@@ -59,91 +55,193 @@ public boolean canSliceDynamic(InputSpec inputSpec)
public List<InputSlice> sliceStatic(InputSpec inputSpec, int maxNumSlices)
{
final ExternalInputSpec externalInputSpec = (ExternalInputSpec) inputSpec;
- final InputSource inputSource = externalInputSpec.getInputSource();
- final InputFormat inputFormat = externalInputSpec.getInputFormat();
- final RowSignature signature = externalInputSpec.getSignature();
-
- // Worker number -> input source for that worker.
- final List<List<InputSource>> workerInputSourcess;
-
- // Figure out input splits for each worker.
- if (inputSource.isSplittable()) {
- //noinspection unchecked
- final SplittableInputSource<Object> splittableInputSource =
(SplittableInputSource<Object>) inputSource;
-
- try {
- workerInputSourcess = SlicerUtils.makeSlices(
- splittableInputSource.createSplits(inputFormat,
FilePerSplitHintSpec.INSTANCE)
- .map(splittableInputSource::withSplit)
- .iterator(),
- maxNumSlices
- );
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
- } else {
- workerInputSourcess =
Collections.singletonList(Collections.singletonList(inputSource));
- }
- // Sanity check. It is a bug in this method if this exception is ever
thrown.
- if (workerInputSourcess.size() > maxNumSlices) {
- throw new ISE("Generated too many slices [%d > %d]",
workerInputSourcess.size(), maxNumSlices);
+ if (externalInputSpec.getInputSource().isSplittable()) {
+ return sliceSplittableInputSource(
+ externalInputSpec,
+ new StaticSplitHintSpec(maxNumSlices),
+ maxNumSlices
+ );
+ } else {
+ return sliceUnsplittableInputSource(externalInputSpec);
}
-
- return IntStream.range(0, maxNumSlices)
- .mapToObj(
- workerNumber -> {
- final List<InputSource> workerInputSources;
-
- if (workerNumber < workerInputSourcess.size()) {
- workerInputSources =
workerInputSourcess.get(workerNumber);
- } else {
- workerInputSources = Collections.emptyList();
- }
-
- if (workerInputSources.isEmpty()) {
- return NilInputSlice.INSTANCE;
- } else {
- return new ExternalInputSlice(workerInputSources,
inputFormat, signature);
- }
- }
- )
- .collect(Collectors.toList());
}
@Override
public List<InputSlice> sliceDynamic(
- InputSpec inputSpec,
- int maxNumSlices,
- int maxFilesPerSlice,
- long maxBytesPerSlice
+ final InputSpec inputSpec,
+ final int maxNumSlices,
+ final int maxFilesPerSlice,
+ final long maxBytesPerSlice
)
{
final ExternalInputSpec externalInputSpec = (ExternalInputSpec) inputSpec;
- if (!externalInputSpec.getInputSource().isSplittable()) {
- return sliceStatic(inputSpec, 1);
+ if (externalInputSpec.getInputSource().isSplittable()) {
+ return sliceSplittableInputSource(
+ externalInputSpec,
+ new DynamicSplitHintSpec(maxNumSlices, maxFilesPerSlice,
maxBytesPerSlice),
+ maxNumSlices
+ );
+ } else {
+ return sliceUnsplittableInputSource(externalInputSpec);
}
+ }
- final SplittableInputSource<?> inputSource = (SplittableInputSource<?>)
externalInputSpec.getInputSource();
- final MaxSizeSplitHintSpec maxSizeSplitHintSpec = new MaxSizeSplitHintSpec(
- new HumanReadableBytes(maxBytesPerSlice),
- maxFilesPerSlice
+ /**
+ * "Slice" an unsplittable input source into a single slice.
+ */
+ private static List<InputSlice> sliceUnsplittableInputSource(final
ExternalInputSpec inputSpec)
+ {
+ return Collections.singletonList(
+ new ExternalInputSlice(
+ Collections.singletonList(inputSpec.getInputSource()),
+ inputSpec.getInputFormat(),
+ inputSpec.getSignature()
+ )
);
+ }
+
+ /**
+ * Slice a {@link SplittableInputSource} using a {@link SplitHintSpec}.
+ */
+ private static List<InputSlice> sliceSplittableInputSource(
+ final ExternalInputSpec inputSpec,
+ final SplitHintSpec splitHintSpec,
+ final int maxNumSlices
Review Comment:
> Also, shouldn't a split be based on size, not number?
The size is part of it; the logic for handling file size- and count-related
logic is encoded in the `SplitHintSpec`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]