gianm commented on code in PR #13955:
URL: https://github.com/apache/druid/pull/13955#discussion_r1149809483
##########
extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java:
##########
@@ -79,73 +71,63 @@ protected InputEntity createEntity(CloudObjectLocation
location)
return new GoogleCloudStorageEntity(storage, location);
}
- @Override
- protected Stream<InputSplit<List<CloudObjectLocation>>>
getPrefixesSplitStream(@Nonnull SplitHintSpec splitHintSpec)
- {
- final Iterator<List<StorageObject>> splitIterator = splitHintSpec.split(
- storageObjectIterable().iterator(),
- storageObject -> {
- final BigInteger sizeInBigInteger = storageObject.getSize();
- long sizeInLong;
- if (sizeInBigInteger == null) {
- sizeInLong = Long.MAX_VALUE;
- } else {
- try {
- sizeInLong = sizeInBigInteger.longValueExact();
- }
- catch (ArithmeticException e) {
- LOG.warn(
- e,
- "The object [%s, %s] has a size [%s] out of the range of the
long type. "
- + "The max long value will be used for its size instead.",
- storageObject.getBucket(),
- storageObject.getName(),
- sizeInBigInteger
- );
- sizeInLong = Long.MAX_VALUE;
- }
- }
- return new InputFileAttribute(sizeInLong);
- }
- );
-
- return Streams.sequentialStreamFrom(splitIterator)
- .map(objects ->
objects.stream().map(this::byteSourceFromStorageObject).collect(Collectors.toList()))
- .map(InputSplit::new);
- }
-
@Override
public SplittableInputSource<List<CloudObjectLocation>>
withSplit(InputSplit<List<CloudObjectLocation>> split)
{
return new GoogleCloudStorageInputSource(storage, inputDataConfig, null,
null, split.get(), getObjectGlob());
}
- private CloudObjectLocation byteSourceFromStorageObject(final StorageObject
storageObject)
+ @Override
+ protected CloudObjectSplitWidget getSplitWidget()
{
- return GoogleUtils.objectToCloudObjectLocation(storageObject);
+ class SplitWidget implements CloudObjectSplitWidget
+ {
+ @Override
+ public Iterator<LocationWithSize>
getDescriptorIteratorForPrefixes(List<URI> prefixes)
+ {
+ return Iterators.transform(
+ GoogleUtils.lazyFetchingStorageObjectsIterator(
+ storage,
+ prefixes.iterator(),
+ inputDataConfig.getMaxListingLength()
Review Comment:
Full disclosure: this logic was pre-existing, I'm just refactoring it a bit.
That being said— looking at the code briefly for
`GoogleUtils.lazyFetchingStorageObjectsIterator`, it seems like this is a
pagination config. So I don't think it changes what the results will be, just
tweaks how they are obtained.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]