paul-rogers commented on code in PR #13955:
URL: https://github.com/apache/druid/pull/13955#discussion_r1157830886
##########
extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java:
##########
@@ -79,73 +71,63 @@ protected InputEntity createEntity(CloudObjectLocation
location)
return new GoogleCloudStorageEntity(storage, location);
}
- @Override
- protected Stream<InputSplit<List<CloudObjectLocation>>>
getPrefixesSplitStream(@Nonnull SplitHintSpec splitHintSpec)
- {
- final Iterator<List<StorageObject>> splitIterator = splitHintSpec.split(
- storageObjectIterable().iterator(),
- storageObject -> {
- final BigInteger sizeInBigInteger = storageObject.getSize();
- long sizeInLong;
- if (sizeInBigInteger == null) {
- sizeInLong = Long.MAX_VALUE;
- } else {
- try {
- sizeInLong = sizeInBigInteger.longValueExact();
- }
- catch (ArithmeticException e) {
- LOG.warn(
- e,
- "The object [%s, %s] has a size [%s] out of the range of the
long type. "
- + "The max long value will be used for its size instead.",
- storageObject.getBucket(),
- storageObject.getName(),
- sizeInBigInteger
- );
- sizeInLong = Long.MAX_VALUE;
- }
- }
- return new InputFileAttribute(sizeInLong);
- }
- );
-
- return Streams.sequentialStreamFrom(splitIterator)
- .map(objects ->
objects.stream().map(this::byteSourceFromStorageObject).collect(Collectors.toList()))
- .map(InputSplit::new);
- }
-
@Override
public SplittableInputSource<List<CloudObjectLocation>>
withSplit(InputSplit<List<CloudObjectLocation>> split)
{
return new GoogleCloudStorageInputSource(storage, inputDataConfig, null,
null, split.get(), getObjectGlob());
}
- private CloudObjectLocation byteSourceFromStorageObject(final StorageObject
storageObject)
+ @Override
+ protected CloudObjectSplitWidget getSplitWidget()
{
- return GoogleUtils.objectToCloudObjectLocation(storageObject);
+ class SplitWidget implements CloudObjectSplitWidget
+ {
+ @Override
+ public Iterator<LocationWithSize>
getDescriptorIteratorForPrefixes(List<URI> prefixes)
+ {
+ return Iterators.transform(
+ GoogleUtils.lazyFetchingStorageObjectsIterator(
+ storage,
+ prefixes.iterator(),
+ inputDataConfig.getMaxListingLength()
Review Comment:
Thanks for the explanation.
##########
extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java:
##########
@@ -79,73 +71,63 @@ protected InputEntity createEntity(CloudObjectLocation
location)
return new GoogleCloudStorageEntity(storage, location);
}
- @Override
- protected Stream<InputSplit<List<CloudObjectLocation>>>
getPrefixesSplitStream(@Nonnull SplitHintSpec splitHintSpec)
- {
- final Iterator<List<StorageObject>> splitIterator = splitHintSpec.split(
- storageObjectIterable().iterator(),
- storageObject -> {
- final BigInteger sizeInBigInteger = storageObject.getSize();
- long sizeInLong;
- if (sizeInBigInteger == null) {
- sizeInLong = Long.MAX_VALUE;
- } else {
- try {
- sizeInLong = sizeInBigInteger.longValueExact();
- }
- catch (ArithmeticException e) {
- LOG.warn(
- e,
- "The object [%s, %s] has a size [%s] out of the range of the
long type. "
- + "The max long value will be used for its size instead.",
- storageObject.getBucket(),
- storageObject.getName(),
- sizeInBigInteger
- );
- sizeInLong = Long.MAX_VALUE;
- }
- }
- return new InputFileAttribute(sizeInLong);
- }
- );
-
- return Streams.sequentialStreamFrom(splitIterator)
- .map(objects ->
objects.stream().map(this::byteSourceFromStorageObject).collect(Collectors.toList()))
- .map(InputSplit::new);
- }
-
@Override
public SplittableInputSource<List<CloudObjectLocation>>
withSplit(InputSplit<List<CloudObjectLocation>> split)
{
return new GoogleCloudStorageInputSource(storage, inputDataConfig, null,
null, split.get(), getObjectGlob());
}
- private CloudObjectLocation byteSourceFromStorageObject(final StorageObject
storageObject)
+ @Override
+ protected CloudObjectSplitWidget getSplitWidget()
{
- return GoogleUtils.objectToCloudObjectLocation(storageObject);
+ class SplitWidget implements CloudObjectSplitWidget
+ {
+ @Override
+ public Iterator<LocationWithSize>
getDescriptorIteratorForPrefixes(List<URI> prefixes)
+ {
+ return Iterators.transform(
+ GoogleUtils.lazyFetchingStorageObjectsIterator(
+ storage,
+ prefixes.iterator(),
+ inputDataConfig.getMaxListingLength()
+ ),
+ object -> new LocationWithSize(object.getBucket(),
object.getName(), getSize(object))
+ );
+ }
+
+ @Override
+ public long getObjectSize(CloudObjectLocation location) throws
IOException
+ {
+ final StorageObject storageObject =
storage.getMetadata(location.getBucket(), location.getPath());
+ return getSize(storageObject);
+ }
+ }
+
+ return new SplitWidget();
}
- private Iterable<StorageObject> storageObjectIterable()
+ private static long getSize(final StorageObject object)
{
- return () -> {
- Iterator<StorageObject> iterator =
GoogleUtils.lazyFetchingStorageObjectsIterator(
- storage,
- getPrefixes().iterator(),
- inputDataConfig.getMaxListingLength()
- );
+ final BigInteger sizeInBigInteger = object.getSize();
- // Skip files that didn't match glob filter.
- if (StringUtils.isNotBlank(getObjectGlob())) {
- PathMatcher m = FileSystems.getDefault().getPathMatcher("glob:" +
getObjectGlob());
-
- iterator = Iterators.filter(
- iterator,
- object -> m.matches(Paths.get(object.getName()))
+ if (sizeInBigInteger == null) {
+ return Long.MAX_VALUE;
Review Comment:
Makes sense. Thanks for the explanation.
##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java:
##########
@@ -19,14 +19,369 @@
package org.apache.druid.msq.kernel.controller;
+import com.google.common.collect.ImmutableMap;
+import it.unimi.dsi.fastutil.ints.Int2IntMaps;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongList;
import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.msq.input.InputSlice;
+import org.apache.druid.msq.input.InputSpec;
+import org.apache.druid.msq.input.InputSpecSlicer;
+import org.apache.druid.msq.input.NilInputSlice;
+import org.apache.druid.msq.input.SlicerUtils;
+import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
+import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory;
+import org.junit.Assert;
import org.junit.Test;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
public class WorkerInputsTest
{
+ private static final String QUERY_ID = "myQuery";
+
+ @Test
+ public void test_max_threeInputs_fourWorkers()
+ {
+ final StageDefinition stageDef =
+ StageDefinition.builder(0)
+ .inputs(new TestInputSpec(1, 2, 3))
+ .maxWorkerCount(4)
+ .processorFactory(new
OffsetLimitFrameProcessorFactory(0, 0L))
+ .build(QUERY_ID);
+
+ final WorkerInputs inputs = WorkerInputs.create(
+ stageDef,
+ Int2IntMaps.EMPTY_MAP,
+ new TestInputSpecSlicer(true),
+ WorkerAssignmentStrategy.MAX
+ );
+
+ Assert.assertEquals(
+ ImmutableMap.<Integer, List<InputSlice>>builder()
+ .put(0, Collections.singletonList(new TestInputSlice(1)))
+ .put(1, Collections.singletonList(new TestInputSlice(2)))
+ .put(2, Collections.singletonList(new TestInputSlice(3)))
+ .put(3, Collections.singletonList(new TestInputSlice()))
+ .build(),
+ inputs.assignmentsMap()
+ );
+ }
+
+ @Test
+ public void test_max_zeroInputs_fourWorkers()
+ {
+ final StageDefinition stageDef =
+ StageDefinition.builder(0)
+ .inputs(new TestInputSpec())
+ .maxWorkerCount(4)
+ .processorFactory(new
OffsetLimitFrameProcessorFactory(0, 0L))
+ .build(QUERY_ID);
+
+ final WorkerInputs inputs = WorkerInputs.create(
+ stageDef,
+ Int2IntMaps.EMPTY_MAP,
+ new TestInputSpecSlicer(true),
+ WorkerAssignmentStrategy.MAX
+ );
+
+ Assert.assertEquals(
+ ImmutableMap.<Integer, List<InputSlice>>builder()
+ .put(0, Collections.singletonList(new TestInputSlice()))
+ .put(1, Collections.singletonList(new TestInputSlice()))
+ .put(2, Collections.singletonList(new TestInputSlice()))
+ .put(3, Collections.singletonList(new TestInputSlice()))
+ .build(),
+ inputs.assignmentsMap()
+ );
+ }
+
+ @Test
+ public void test_auto_zeroInputSpecs_fourWorkers()
+ {
+ final StageDefinition stageDef =
+ StageDefinition.builder(0)
+ .inputs()
+ .maxWorkerCount(4)
+ .processorFactory(new
OffsetLimitFrameProcessorFactory(0, 0L))
+ .build(QUERY_ID);
+
+ final WorkerInputs inputs = WorkerInputs.create(
+ stageDef,
+ Int2IntMaps.EMPTY_MAP,
+ new TestInputSpecSlicer(true),
+ WorkerAssignmentStrategy.AUTO
+ );
+
+ Assert.assertEquals(
+ ImmutableMap.<Integer, List<InputSlice>>builder()
+ .put(0, Collections.singletonList(NilInputSlice.INSTANCE))
+ .build(),
+ inputs.assignmentsMap()
+ );
+ }
+
+ @Test
+ public void test_auto_zeroInputSlices_fourWorkers()
+ {
+ final StageDefinition stageDef =
+ StageDefinition.builder(0)
+ .inputs(new TestInputSpec())
+ .maxWorkerCount(4)
+ .processorFactory(new
OffsetLimitFrameProcessorFactory(0, 0L))
+ .build(QUERY_ID);
+
+ final WorkerInputs inputs = WorkerInputs.create(
+ stageDef,
+ Int2IntMaps.EMPTY_MAP,
+ new TestInputSpecSlicer(true),
+ WorkerAssignmentStrategy.AUTO
+ );
+
+ Assert.assertEquals(
+ ImmutableMap.<Integer, List<InputSlice>>builder()
+ .put(0, Collections.singletonList(NilInputSlice.INSTANCE))
+ .build(),
+ inputs.assignmentsMap()
+ );
+ }
+
+ @Test
+ public void test_auto_zeroInputSlices_broadcast_fourWorkers()
+ {
+ final StageDefinition stageDef =
+ StageDefinition.builder(0)
+ .inputs(new TestInputSpec())
+ .broadcastInputs(IntSet.of(0))
+ .maxWorkerCount(4)
+ .processorFactory(new
OffsetLimitFrameProcessorFactory(0, 0L))
+ .build(QUERY_ID);
+
+ final WorkerInputs inputs = WorkerInputs.create(
+ stageDef,
+ Int2IntMaps.EMPTY_MAP,
+ new TestInputSpecSlicer(true),
+ WorkerAssignmentStrategy.AUTO
+ );
+
+ Assert.assertEquals(
+ ImmutableMap.<Integer, List<InputSlice>>builder()
+ .put(0, Collections.singletonList(new TestInputSlice()))
+ .build(),
+ inputs.assignmentsMap()
+ );
+ }
+
+ @Test
+ public void test_auto_threeInputs_fourWorkers()
+ {
+ final StageDefinition stageDef =
+ StageDefinition.builder(0)
+ .inputs(new TestInputSpec(1, 2, 3))
+ .maxWorkerCount(4)
+ .processorFactory(new
OffsetLimitFrameProcessorFactory(0, 0L))
+ .build(QUERY_ID);
+
+ final WorkerInputs inputs = WorkerInputs.create(
+ stageDef,
+ Int2IntMaps.EMPTY_MAP,
+ new TestInputSpecSlicer(true),
+ WorkerAssignmentStrategy.AUTO
+ );
+
+ Assert.assertEquals(
+ ImmutableMap.<Integer, List<InputSlice>>builder()
+ .put(0, Collections.singletonList(new TestInputSlice(1, 2,
3)))
+ .build(),
+ inputs.assignmentsMap()
+ );
+ }
+
+ @Test
+ public void test_auto_threeBigInputs_fourWorkers()
+ {
+ final StageDefinition stageDef =
+ StageDefinition.builder(0)
+ .inputs(new TestInputSpec(4_000_000_000L,
4_000_000_001L, 4_000_000_002L))
+ .maxWorkerCount(4)
+ .processorFactory(new
OffsetLimitFrameProcessorFactory(0, 0L))
+ .build(QUERY_ID);
+
+ final WorkerInputs inputs = WorkerInputs.create(
+ stageDef,
+ Int2IntMaps.EMPTY_MAP,
+ new TestInputSpecSlicer(true),
+ WorkerAssignmentStrategy.AUTO
+ );
+
+ Assert.assertEquals(
+ ImmutableMap.<Integer, List<InputSlice>>builder()
+ .put(0, Collections.singletonList(new
TestInputSlice(4_000_000_000L, 4_000_000_001L)))
+ .put(1, Collections.singletonList(new
TestInputSlice(4_000_000_002L)))
+ .build(),
+ inputs.assignmentsMap()
+ );
+ }
+
+ @Test
+ public void test_auto_threeBigInputs_oneWorker()
Review Comment:
Those two comments were trying to work out the limits of the split
algorithm. What happens if there is more data than fits into the available
number of splits, each filled to the maximum size? Or, is there a maximum size
per split? If there is more data, do we keep adding more to the existing splits
round-robin?
##########
extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorage.java:
##########
@@ -66,6 +67,11 @@ public InputStream get(final String bucket, final String
path, long start) throw
return inputStream;
}
+ public StorageObject getMetadata(final String bucket, final String path)
throws IOException
+ {
+ return storage.get().objects().get(bucket, path).execute();
Review Comment:
Thanks for the explanation.
##########
extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/data/input/aliyun/OssInputSourceTest.java:
##########
@@ -318,6 +324,13 @@ public void testSerdeWithOtherOtherInvalidArgs()
@Test
public void testWithUrisSplit()
{
+ EasyMock.reset(OSSCLIENT);
Review Comment:
Maybe I was hallucinating? I recall seeing something about a file limit.
But, going back now, I don't see anything. So, I guess we're good.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicer.java:
##########
@@ -59,91 +55,193 @@ public boolean canSliceDynamic(InputSpec inputSpec)
public List<InputSlice> sliceStatic(InputSpec inputSpec, int maxNumSlices)
{
final ExternalInputSpec externalInputSpec = (ExternalInputSpec) inputSpec;
- final InputSource inputSource = externalInputSpec.getInputSource();
- final InputFormat inputFormat = externalInputSpec.getInputFormat();
- final RowSignature signature = externalInputSpec.getSignature();
-
- // Worker number -> input source for that worker.
- final List<List<InputSource>> workerInputSourcess;
-
- // Figure out input splits for each worker.
- if (inputSource.isSplittable()) {
- //noinspection unchecked
- final SplittableInputSource<Object> splittableInputSource =
(SplittableInputSource<Object>) inputSource;
-
- try {
- workerInputSourcess = SlicerUtils.makeSlices(
- splittableInputSource.createSplits(inputFormat,
FilePerSplitHintSpec.INSTANCE)
- .map(splittableInputSource::withSplit)
- .iterator(),
- maxNumSlices
- );
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
- } else {
- workerInputSourcess =
Collections.singletonList(Collections.singletonList(inputSource));
- }
- // Sanity check. It is a bug in this method if this exception is ever
thrown.
- if (workerInputSourcess.size() > maxNumSlices) {
- throw new ISE("Generated too many slices [%d > %d]",
workerInputSourcess.size(), maxNumSlices);
+ if (externalInputSpec.getInputSource().isSplittable()) {
+ return sliceSplittableInputSource(
+ externalInputSpec,
+ new StaticSplitHintSpec(maxNumSlices),
+ maxNumSlices
+ );
+ } else {
+ return sliceUnsplittableInputSource(externalInputSpec);
}
-
- return IntStream.range(0, maxNumSlices)
- .mapToObj(
- workerNumber -> {
- final List<InputSource> workerInputSources;
-
- if (workerNumber < workerInputSourcess.size()) {
- workerInputSources =
workerInputSourcess.get(workerNumber);
- } else {
- workerInputSources = Collections.emptyList();
- }
-
- if (workerInputSources.isEmpty()) {
- return NilInputSlice.INSTANCE;
- } else {
- return new ExternalInputSlice(workerInputSources,
inputFormat, signature);
- }
- }
- )
- .collect(Collectors.toList());
}
@Override
public List<InputSlice> sliceDynamic(
- InputSpec inputSpec,
- int maxNumSlices,
- int maxFilesPerSlice,
- long maxBytesPerSlice
+ final InputSpec inputSpec,
+ final int maxNumSlices,
+ final int maxFilesPerSlice,
+ final long maxBytesPerSlice
)
{
final ExternalInputSpec externalInputSpec = (ExternalInputSpec) inputSpec;
- if (!externalInputSpec.getInputSource().isSplittable()) {
- return sliceStatic(inputSpec, 1);
+ if (externalInputSpec.getInputSource().isSplittable()) {
+ return sliceSplittableInputSource(
+ externalInputSpec,
+ new DynamicSplitHintSpec(maxNumSlices, maxFilesPerSlice,
maxBytesPerSlice),
+ maxNumSlices
+ );
+ } else {
+ return sliceUnsplittableInputSource(externalInputSpec);
}
+ }
- final SplittableInputSource<?> inputSource = (SplittableInputSource<?>)
externalInputSpec.getInputSource();
- final MaxSizeSplitHintSpec maxSizeSplitHintSpec = new MaxSizeSplitHintSpec(
- new HumanReadableBytes(maxBytesPerSlice),
- maxFilesPerSlice
+ /**
+ * "Slice" an unsplittable input source into a single slice.
+ */
+ private static List<InputSlice> sliceUnsplittableInputSource(final
ExternalInputSpec inputSpec)
+ {
+ return Collections.singletonList(
+ new ExternalInputSlice(
+ Collections.singletonList(inputSpec.getInputSource()),
+ inputSpec.getInputFormat(),
+ inputSpec.getSignature()
+ )
);
+ }
+
+ /**
+ * Slice a {@link SplittableInputSource} using a {@link SplitHintSpec}.
+ */
+ private static List<InputSlice> sliceSplittableInputSource(
+ final ExternalInputSpec inputSpec,
+ final SplitHintSpec splitHintSpec,
+ final int maxNumSlices
Review Comment:
Thanks for the explanations!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]