rdsr commented on a change in pull request #111: Split files when planning scan 
tasks
URL: https://github.com/apache/incubator-iceberg/pull/111#discussion_r260403287
 
 

 ##########
 File path: core/src/main/java/com/netflix/iceberg/BaseTableScan.java
 ##########
 @@ -231,4 +234,69 @@ public String toString() {
         .add("filter", rowFilter)
         .toString();
   }
+
+  private CloseableIterable<FileScanTask> planSplits(long splitSize) {
+    final CloseableIterable<FileScanTask> fileScanTasks = planFiles();
+    final Iterable<FileScanTask> splitTasks = FluentIterable
+        .from(fileScanTasks)
+        .transformAndConcat(input -> getSplits(input, splitSize));
+    // Capture manifests which can be closed after scan planning
+    final CloseableIterable<FileScanTask> closeableSplitTasks = 
CloseableIterable
+        .combine(splitTasks, ImmutableList.of(fileScanTasks));
+    return closeableSplitTasks;
+  }
+
+  private static Iterable<FileScanTask> getSplits(FileScanTask fileScanTask, 
long splitSize) {
+    return () -> new Iterator<FileScanTask>() {
+      long offset = 0;
+      long remainingLen = fileScanTask.length();
+
+      @Override
+      public boolean hasNext() {
+        return remainingLen > 0;
+      }
+
+      @Override
+      public FileScanTask next() {
+        long len = Math.min(splitSize, remainingLen);
+        final FileScanTask splitTask = splitTask(offset, len, fileScanTask);
+        offset += len;
+        remainingLen -= len;
+        return splitTask;
+      }
+    };
+  }
+
+  /**
+   * Creates a split from a given {@link FileScanTask}.
+   * The split is a {@link FileScanTask} which starts from `offset` and has 
length `len`.
+   */
+  private static FileScanTask splitTask(long offset, long len, FileScanTask 
fileScanTask) {
 
 Review comment:
   `BaseFileScanTask` has different input parameters in its constructor. It 
take schema and a partition spec in string form and parses it. What I needed 
here was a simple delegate pattern which would override offset and len but for 
every other field delegate to the underlying `FileScanTask` .  
   
   Maybe a `copy` method, something very similar to 
https://kotlinlang.org/docs/reference/data-classes.html#copying is what I need, 
but I was hesitant to make these changes in `BaseFileScanTask`.  
   
   I can change this  if you feel that's the right thing to do.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to