rdblue commented on a change in pull request #204: Combine tasks to scan up to
target split size using parquet row group information
URL: https://github.com/apache/incubator-iceberg/pull/204#discussion_r290078022
##########
File path: core/src/main/java/org/apache/iceberg/BaseFileScanTask.java
##########
@@ -91,29 +93,53 @@ public String toString() {
.toString();
}
+
+ /**
+ * This iterator returns {@link FileScanTask} using guidance provided by
split offsets.
+ */
@VisibleForTesting
- static final class OffsetsBasedSplitScanTaskIterator implements
Iterator<FileScanTask> {
- private final List<Long> splitOffsets;
+ static final class OffsetsAwareTargetSplitSizeScanTaskIterator implements
Iterator<FileScanTask> {
+ private final List<Long> offsets;
+ private final List<Long> splitSizes;
private final FileScanTask parentScanTask;
- private int idx = 0;
-
- OffsetsBasedSplitScanTaskIterator(List<Long> splitOffsets, FileScanTask
fileScanTask) {
- this.splitOffsets = splitOffsets;
- this.parentScanTask = fileScanTask;
+ private final long targetSplitSize;
+ private int offsetIdx = 0;
+ private int sizeIdx = 0;
+
+ OffsetsAwareTargetSplitSizeScanTaskIterator(
+ List<Long> offsetList, FileScanTask parentScanTask, long
targetSplitSize) {
+ this.offsets = ImmutableList.copyOf(offsetList);
+ this.parentScanTask = parentScanTask;
+ this.targetSplitSize = targetSplitSize;
+ this.splitSizes = new ArrayList<>(offsets.size());
+ int lastIndex = offsets.size() - 1;
+ for (int index = 0; index < lastIndex; index++) {
+ splitSizes.add(offsets.get(index + 1) - offsets.get(index));
+ }
+ splitSizes.add(parentScanTask.length() - offsets.get(lastIndex));
}
@Override
public boolean hasNext() {
- return idx < splitOffsets.size();
+ return sizeIdx < splitSizes.size();
}
@Override
public FileScanTask next() {
- long start = splitOffsets.get(idx);
- idx++;
- long end = hasNext() ? splitOffsets.get(idx) : parentScanTask.length();
- return new SplitScanTask(start, end - start, parentScanTask);
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ long currentSize = splitSizes.get(sizeIdx);
+ sizeIdx += 1; // always consume at least one file split
+ while (sizeIdx < splitSizes.size() && currentSize +
splitSizes.get(sizeIdx) <= targetSplitSize) {
+ currentSize += splitSizes.get(sizeIdx);
+ sizeIdx += 1;
+ }
+ FileScanTask combinedTask = new SplitScanTask(offsets.get(offsetIdx),
currentSize, parentScanTask);
+ offsetIdx = sizeIdx;
+ return combinedTask;
}
+
Review comment:
Nit: another blank line
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]