rdblue commented on a change in pull request #204: Combine tasks to scan up to
target split size using parquet row group information
URL: https://github.com/apache/incubator-iceberg/pull/204#discussion_r290073668
##########
File path: core/src/main/java/org/apache/iceberg/BaseFileScanTask.java
##########
@@ -91,28 +93,66 @@ public String toString() {
.toString();
}
+
+ /**
+ * This iterator returns {@link FileScanTask} using guidance provided by
split offsets.
+ */
@VisibleForTesting
- static final class OffsetsBasedSplitScanTaskIterator implements
Iterator<FileScanTask> {
- private final List<Long> splitOffsets;
+ static final class OffsetsAwareTargetSplitSizeScanTaskIterator implements
Iterator<FileScanTask> {
+ private final List<Long> offsets;
+ private final List<Long> splitSizes;
private final FileScanTask parentScanTask;
- private int idx = 0;
-
- OffsetsBasedSplitScanTaskIterator(List<Long> splitOffsets, FileScanTask
fileScanTask) {
- this.splitOffsets = splitOffsets;
- this.parentScanTask = fileScanTask;
+ private final long targetSplitSize;
+ private int offsetIdx = 0;
+ private int sizeIdx = 0;
+
+ OffsetsAwareTargetSplitSizeScanTaskIterator(
+ List<Long> offsetList,
+ FileScanTask parentScanTask,
+ long targetSplitSize
+ ) {
+ this.offsets = ImmutableList.copyOf(offsetList);
+ this.parentScanTask = parentScanTask;
+ this.targetSplitSize = targetSplitSize;
+ this.splitSizes = new ArrayList<>(offsetList.size());
+ int idx = 0;
+ while (idx < offsets.size()) {
+ splitSizes.add(getSplitSize(idx));
+ idx++;
+ }
}
@Override
public boolean hasNext() {
- return idx < splitOffsets.size();
+ return sizeIdx < splitSizes.size();
}
@Override
public FileScanTask next() {
- long start = splitOffsets.get(idx);
- idx++;
- long end = hasNext() ? splitOffsets.get(idx) : parentScanTask.length();
- return new SplitScanTask(start, end - start, parentScanTask);
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ // We always pick the current split even if it potentially exceeds the
target split size
+ long currentSize = splitSizes.get(sizeIdx);
+ FileScanTask combinedTask;
+ sizeIdx++;
+ while (hasNext()) {
+ if (currentSize + splitSizes.get(sizeIdx) <= targetSplitSize) {
+ currentSize += splitSizes.get(sizeIdx);
+ sizeIdx++;
+ } else {
+ combinedTask = new SplitScanTask(offsets.get(offsetIdx),
currentSize, parentScanTask);
+ offsetIdx = sizeIdx;
+ return combinedTask;
+ }
+ }
+ combinedTask = new SplitScanTask(offsets.get(offsetIdx), currentSize,
parentScanTask);
Review comment:
This doesn't update `offsetIdx` and it isn't obvious at first why that is
okay (because `splitSizes` is finished). I think it would be better to simplify
this logic a little bit to have only one return statement. Like this:
```java
long currentSize = splitSizes.get(sizeIdx);
sizeIdx += 1; // always consume at least one file split
while (sizeIdx < splitSizes.size() && currentSize + splitSizes.get(sizeIdx)
<= targetSplitSize) {
currentSize += splitSizes.get(sizeIdx);
sizeIdx += 1;
}
combinedTask = new SplitScanTask(offsets.get(offsetIdx), currentSize,
parentScanTask);
offsetIdx = sizeIdx;
return combinedTask;
```
That way, the behavior is always the same for all splits.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]