This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 462bd31  Split Parquet tasks at row group boundaries (#188)
462bd31 is described below

commit 462bd310a274093daba1edef00eefb236c20ae62
Author: Samarth Jain <[email protected]>
AuthorDate: Fri May 17 12:19:29 2019 -0700

    Split Parquet tasks at row group boundaries (#188)
---
 api/src/main/java/org/apache/iceberg/DataFile.java |  5 ++-
 .../java/org/apache/iceberg/io/FileAppender.java   |  5 ++-
 .../java/org/apache/iceberg/BaseFileScanTask.java  | 44 ++++++++++++++++++----
 .../java/org/apache/iceberg/MockFileScanTask.java  | 34 +++++++++++++++++
 ...ava => TestFixedSizeSplitScanTaskIterator.java} | 21 ++---------
 ... => TestOffsetsBasedSplitScanTaskIterator.java} | 44 ++++++++++------------
 .../org/apache/iceberg/parquet/ParquetUtil.java    |  7 ++++
 7 files changed, 106 insertions(+), 54 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/DataFile.java 
b/api/src/main/java/org/apache/iceberg/DataFile.java
index d4c1b67..fed0ac7 100644
--- a/api/src/main/java/org/apache/iceberg/DataFile.java
+++ b/api/src/main/java/org/apache/iceberg/DataFile.java
@@ -139,8 +139,9 @@ public interface DataFile {
   DataFile copy();
 
   /**
-   * @return a list of offsets for file blocks if applicable, null otherwise. 
When available, this
-   * information is used for planning scan tasks whose boundaries are 
determined by these offsets.
+   * @return List of recommended split locations, if applicable, null 
otherwise.
+   * When available, this information is used for planning scan tasks whose 
boundaries
+   * are determined by these offsets. The returned list must be sorted in 
ascending order.
    */
   List<Long> splitOffsets();
 }
diff --git a/api/src/main/java/org/apache/iceberg/io/FileAppender.java 
b/api/src/main/java/org/apache/iceberg/io/FileAppender.java
index 74229cc..5c6cef2 100644
--- a/api/src/main/java/org/apache/iceberg/io/FileAppender.java
+++ b/api/src/main/java/org/apache/iceberg/io/FileAppender.java
@@ -48,8 +48,9 @@ public interface FileAppender<D> extends Closeable {
   long length();
 
   /**
-   * @return a list of offsets for file blocks if applicable, null otherwise. 
When available, this
-   * information is used for planning scan tasks whose boundaries are 
determined by these offsets.
+   * @return a list of recommended split locations, if applicable, null 
otherwise. When available,
+   * this information is used for planning scan tasks whose boundaries are 
determined by these offsets.
+   * The returned list must be sorted in ascending order.
    * Only valid after the file is closed.
    */
   default List<Long> splitOffsets() {
diff --git a/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java 
b/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java
index 6f179e4..b90805c 100644
--- a/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java
+++ b/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java
@@ -19,9 +19,11 @@
 
 package org.apache.iceberg;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableList;
 import java.util.Iterator;
+import java.util.List;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.ResidualEvaluator;
 
@@ -71,10 +73,13 @@ class BaseFileScanTask implements FileScanTask {
   @Override
   public Iterable<FileScanTask> split(long splitSize) {
     if (file.format().isSplittable()) {
-      return () -> new SplitScanTaskIterator(splitSize, this);
-    } else {
-      return ImmutableList.of(this);
+      if (file.splitOffsets() != null) {
+        return () -> new 
OffsetsBasedSplitScanTaskIterator(file.splitOffsets(), this);
+      } else {
+        return () -> new FixedSizeSplitScanTaskIterator(splitSize, this);
+      }
     }
+    return ImmutableList.of(this);
   }
 
   @Override
@@ -86,16 +91,39 @@ class BaseFileScanTask implements FileScanTask {
         .toString();
   }
 
-  /**
-   * Visible for Testing
-   */
-  static final class SplitScanTaskIterator implements Iterator<FileScanTask> {
+  @VisibleForTesting
+  static final class OffsetsBasedSplitScanTaskIterator implements 
Iterator<FileScanTask> {
+    private final List<Long> splitOffsets;
+    private final FileScanTask parentScanTask;
+    private int idx = 0;
+
+    OffsetsBasedSplitScanTaskIterator(List<Long> splitOffsets, FileScanTask 
fileScanTask) {
+      this.splitOffsets = splitOffsets;
+      this.parentScanTask = fileScanTask;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return idx < splitOffsets.size();
+    }
+
+    @Override
+    public FileScanTask next() {
+      long start = splitOffsets.get(idx);
+      idx++;
+      long end = hasNext() ? splitOffsets.get(idx) : parentScanTask.length();
+      return new SplitScanTask(start, end - start, parentScanTask);
+    }
+  }
+
+  @VisibleForTesting
+  static final class FixedSizeSplitScanTaskIterator implements 
Iterator<FileScanTask> {
     private long offset;
     private long remainingLen;
     private long splitSize;
     private final FileScanTask fileScanTask;
 
-    SplitScanTaskIterator(long splitSize, FileScanTask fileScanTask) {
+    FixedSizeSplitScanTaskIterator(long splitSize, FileScanTask fileScanTask) {
       this.offset = 0;
       this.remainingLen = fileScanTask.length();
       this.splitSize = splitSize;
diff --git a/core/src/test/java/org/apache/iceberg/MockFileScanTask.java 
b/core/src/test/java/org/apache/iceberg/MockFileScanTask.java
new file mode 100644
index 0000000..10b235a
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/MockFileScanTask.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+public class MockFileScanTask extends BaseFileScanTask {
+
+  private final long length;
+
+  public MockFileScanTask(long length) {
+    super(null, null, null, null);
+    this.length = length;
+  }
+
+  @Override
+  public long length() {
+    return length;
+  }
+}
diff --git 
a/core/src/test/java/org/apache/iceberg/TestSplitScanTaskIterator.java 
b/core/src/test/java/org/apache/iceberg/TestFixedSizeSplitScanTaskIterator.java
similarity index 77%
copy from core/src/test/java/org/apache/iceberg/TestSplitScanTaskIterator.java
copy to 
core/src/test/java/org/apache/iceberg/TestFixedSizeSplitScanTaskIterator.java
index b37caee..bac47e0 100644
--- a/core/src/test/java/org/apache/iceberg/TestSplitScanTaskIterator.java
+++ 
b/core/src/test/java/org/apache/iceberg/TestFixedSizeSplitScanTaskIterator.java
@@ -24,9 +24,9 @@ import java.util.List;
 import org.junit.Assert;
 import org.junit.Test;
 
-import static org.apache.iceberg.BaseFileScanTask.SplitScanTaskIterator;
+import static 
org.apache.iceberg.BaseFileScanTask.FixedSizeSplitScanTaskIterator;
 
-public class TestSplitScanTaskIterator {
+public class TestFixedSizeSplitScanTaskIterator {
   @Test
   public void testSplits() {
     verify(15L, 100L, asList(
@@ -37,7 +37,8 @@ public class TestSplitScanTaskIterator {
   }
 
   private static void verify(long splitSize, long fileLen, List<List<Long>> 
offsetLenPairs) {
-    List<FileScanTask> tasks = Lists.newArrayList(new 
SplitScanTaskIterator(splitSize, new MockFileScanTask(fileLen)));
+    List<FileScanTask> tasks = Lists.newArrayList(
+        new FixedSizeSplitScanTaskIterator(splitSize, new 
MockFileScanTask(fileLen)));
     for (int i = 0; i < tasks.size(); i++) {
       FileScanTask task = tasks.get(i);
       List<Long> split = offsetLenPairs.get(i);
@@ -48,20 +49,6 @@ public class TestSplitScanTaskIterator {
     }
   }
 
-  private static class MockFileScanTask extends BaseFileScanTask {
-    private final long length;
-
-    MockFileScanTask(long length) {
-      super(null, null, null, null);
-      this.length = length;
-    }
-
-    @Override
-    public long length() {
-      return length;
-    }
-  }
-
   private <T> List<T> asList(T... items) {
     return Lists.newArrayList(items);
   }
diff --git 
a/core/src/test/java/org/apache/iceberg/TestSplitScanTaskIterator.java 
b/core/src/test/java/org/apache/iceberg/TestOffsetsBasedSplitScanTaskIterator.java
similarity index 56%
rename from core/src/test/java/org/apache/iceberg/TestSplitScanTaskIterator.java
rename to 
core/src/test/java/org/apache/iceberg/TestOffsetsBasedSplitScanTaskIterator.java
index b37caee..0b70ebb 100644
--- a/core/src/test/java/org/apache/iceberg/TestSplitScanTaskIterator.java
+++ 
b/core/src/test/java/org/apache/iceberg/TestOffsetsBasedSplitScanTaskIterator.java
@@ -24,20 +24,28 @@ import java.util.List;
 import org.junit.Assert;
 import org.junit.Test;
 
-import static org.apache.iceberg.BaseFileScanTask.SplitScanTaskIterator;
-
-public class TestSplitScanTaskIterator {
+public class TestOffsetsBasedSplitScanTaskIterator {
   @Test
   public void testSplits() {
-    verify(15L, 100L, asList(
-        asList(0L, 15L), asList(15L, 15L), asList(30L, 15L), asList(45L, 15L), 
asList(60L, 15L),
-        asList(75L, 15L), asList(90L, 10L)));
-    verify(10L, 10L, asList(asList(0L, 10L)));
-    verify(20L, 10L, asList(asList(0L, 10L)));
+    // case when the last row group has more than one byte
+    verify(asList(4L, 10L, 15L, 18L, 30L, 45L), 48L, asList(
+        asList(4L, 6L), asList(10L, 5L), asList(15L, 3L), asList(18L, 12L), 
asList(30L, 15L),
+        asList(45L, 3L)));
+
+    // case when the last row group has 1 byte
+    verify(asList(4L, 10L, 15L, 18L, 30L, 45L), 46L, asList(
+        asList(4L, 6L), asList(10L, 5L), asList(15L, 3L), asList(18L, 12L), 
asList(30L, 15L),
+        asList(45L, 1L)));
+
+    // case when there is only one row group
+    verify(asList(4L), 48L, asList(
+        asList(4L, 44L)));
   }
 
-  private static void verify(long splitSize, long fileLen, List<List<Long>> 
offsetLenPairs) {
-    List<FileScanTask> tasks = Lists.newArrayList(new 
SplitScanTaskIterator(splitSize, new MockFileScanTask(fileLen)));
+  private static void verify(List<Long> offsetRanges, long fileLen, 
List<List<Long>> offsetLenPairs) {
+    List<FileScanTask> tasks = Lists.newArrayList(
+            new 
BaseFileScanTask.OffsetsBasedSplitScanTaskIterator(offsetRanges, new 
MockFileScanTask(fileLen)));
+    Assert.assertEquals("Number of tasks don't match", offsetLenPairs.size(), 
tasks.size());
     for (int i = 0; i < tasks.size(); i++) {
       FileScanTask task = tasks.get(i);
       List<Long> split = offsetLenPairs.get(i);
@@ -48,21 +56,7 @@ public class TestSplitScanTaskIterator {
     }
   }
 
-  private static class MockFileScanTask extends BaseFileScanTask {
-    private final long length;
-
-    MockFileScanTask(long length) {
-      super(null, null, null, null);
-      this.length = length;
-    }
-
-    @Override
-    public long length() {
-      return length;
-    }
-  }
-
-  private <T> List<T> asList(T... items) {
+  private static <T> List<T> asList(T... items) {
     return Lists.newArrayList(items);
   }
 }
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
index ecf9c8d..0bd4154 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
@@ -25,6 +25,7 @@ import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -109,11 +110,17 @@ public class ParquetUtil {
         toBufferMap(fileSchema, lowerBounds), toBufferMap(fileSchema, 
upperBounds));
   }
 
+
+  /**
+   * @return a list of offsets in ascending order determined by the starting 
position
+   * of the row groups
+   */
   public static List<Long> getSplitOffsets(ParquetMetadata md) {
     List<Long> splitOffsets = new ArrayList<>(md.getBlocks().size());
     for (BlockMetaData blockMetaData : md.getBlocks()) {
       splitOffsets.add(blockMetaData.getStartingPos());
     }
+    Collections.sort(splitOffsets);
     return ImmutableList.copyOf(splitOffsets);
   }
 

Reply via email to