rdblue commented on a change in pull request #204: Combine tasks to scan up to
target split size using parquet row group information
URL: https://github.com/apache/incubator-iceberg/pull/204#discussion_r290071641
##########
File path: core/src/main/java/org/apache/iceberg/BaseFileScanTask.java
##########
@@ -91,28 +93,66 @@ public String toString() {
.toString();
}
+
+ /**
+ * This iterator returns {@link FileScanTask} using guidance provided by
split offsets.
+ */
@VisibleForTesting
- static final class OffsetsBasedSplitScanTaskIterator implements
Iterator<FileScanTask> {
- private final List<Long> splitOffsets;
+ static final class OffsetsAwareTargetSplitSizeScanTaskIterator implements
Iterator<FileScanTask> {
+ private final List<Long> offsets;
+ private final List<Long> splitSizes;
private final FileScanTask parentScanTask;
- private int idx = 0;
-
- OffsetsBasedSplitScanTaskIterator(List<Long> splitOffsets, FileScanTask
fileScanTask) {
- this.splitOffsets = splitOffsets;
- this.parentScanTask = fileScanTask;
+ private final long targetSplitSize;
+ private int offsetIdx = 0;
+ private int sizeIdx = 0;
+
+ OffsetsAwareTargetSplitSizeScanTaskIterator(
+ List<Long> offsetList,
+ FileScanTask parentScanTask,
+ long targetSplitSize
+ ) {
+ this.offsets = ImmutableList.copyOf(offsetList);
+ this.parentScanTask = parentScanTask;
+ this.targetSplitSize = targetSplitSize;
+ this.splitSizes = new ArrayList<>(offsetList.size());
+ int idx = 0;
+ while (idx < offsets.size()) {
+ splitSizes.add(getSplitSize(idx));
+ idx++;
+ }
}
@Override
public boolean hasNext() {
- return idx < splitOffsets.size();
+ return sizeIdx < splitSizes.size();
}
@Override
public FileScanTask next() {
- long start = splitOffsets.get(idx);
- idx++;
- long end = hasNext() ? splitOffsets.get(idx) : parentScanTask.length();
- return new SplitScanTask(start, end - start, parentScanTask);
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ // We always pick the current split even if it potentially exceeds the
target split size
+ long currentSize = splitSizes.get(sizeIdx);
+ FileScanTask combinedTask;
+ sizeIdx++;
+ while (hasNext()) {
+ if (currentSize + splitSizes.get(sizeIdx) <= targetSplitSize) {
+ currentSize += splitSizes.get(sizeIdx);
+ sizeIdx++;
+ } else {
+ combinedTask = new SplitScanTask(offsets.get(offsetIdx),
currentSize, parentScanTask);
+ offsetIdx = sizeIdx;
+ return combinedTask;
+ }
+ }
+ combinedTask = new SplitScanTask(offsets.get(offsetIdx), currentSize,
parentScanTask);
+ return combinedTask;
+ }
+
+ private long getSplitSize(int idx) {
Review comment:
If this were left in the for loop, you could handle the last offset outside
the loop instead of using the check here:
```java
int lastIndex = offsets.size() - 1
for (int index = 0; index < lastIndex; index += 1) {
splitSizes.add(offsets.get(index + 1) - offsets.get(index));
}
splitSizes.add(parentScanTask.length() - offsets.get(lastIndex));
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]