This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 462bd31 Split Parquet tasks at row group boundaries (#188)
462bd31 is described below
commit 462bd310a274093daba1edef00eefb236c20ae62
Author: Samarth Jain <[email protected]>
AuthorDate: Fri May 17 12:19:29 2019 -0700
Split Parquet tasks at row group boundaries (#188)
---
api/src/main/java/org/apache/iceberg/DataFile.java | 5 ++-
.../java/org/apache/iceberg/io/FileAppender.java | 5 ++-
.../java/org/apache/iceberg/BaseFileScanTask.java | 44 ++++++++++++++++++----
.../java/org/apache/iceberg/MockFileScanTask.java | 34 +++++++++++++++++
...ava => TestFixedSizeSplitScanTaskIterator.java} | 21 ++---------
... => TestOffsetsBasedSplitScanTaskIterator.java} | 44 ++++++++++------------
.../org/apache/iceberg/parquet/ParquetUtil.java | 7 ++++
7 files changed, 106 insertions(+), 54 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/DataFile.java
b/api/src/main/java/org/apache/iceberg/DataFile.java
index d4c1b67..fed0ac7 100644
--- a/api/src/main/java/org/apache/iceberg/DataFile.java
+++ b/api/src/main/java/org/apache/iceberg/DataFile.java
@@ -139,8 +139,9 @@ public interface DataFile {
DataFile copy();
/**
- * @return a list of offsets for file blocks if applicable, null otherwise.
When available, this
- * information is used for planning scan tasks whose boundaries are
determined by these offsets.
+ * @return List of recommended split locations, if applicable, null
otherwise.
+ * When available, this information is used for planning scan tasks whose
boundaries
+ * are determined by these offsets. The returned list must be sorted in
ascending order.
*/
List<Long> splitOffsets();
}
diff --git a/api/src/main/java/org/apache/iceberg/io/FileAppender.java
b/api/src/main/java/org/apache/iceberg/io/FileAppender.java
index 74229cc..5c6cef2 100644
--- a/api/src/main/java/org/apache/iceberg/io/FileAppender.java
+++ b/api/src/main/java/org/apache/iceberg/io/FileAppender.java
@@ -48,8 +48,9 @@ public interface FileAppender<D> extends Closeable {
long length();
/**
- * @return a list of offsets for file blocks if applicable, null otherwise.
When available, this
- * information is used for planning scan tasks whose boundaries are
determined by these offsets.
+ * @return a list of recommended split locations, if applicable, null
otherwise. When available,
+ * this information is used for planning scan tasks whose boundaries are
determined by these offsets.
+ * The returned list must be sorted in ascending order.
* Only valid after the file is closed.
*/
default List<Long> splitOffsets() {
diff --git a/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java
b/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java
index 6f179e4..b90805c 100644
--- a/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java
+++ b/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java
@@ -19,9 +19,11 @@
package org.apache.iceberg;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
import java.util.Iterator;
+import java.util.List;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.ResidualEvaluator;
@@ -71,10 +73,13 @@ class BaseFileScanTask implements FileScanTask {
@Override
public Iterable<FileScanTask> split(long splitSize) {
if (file.format().isSplittable()) {
- return () -> new SplitScanTaskIterator(splitSize, this);
- } else {
- return ImmutableList.of(this);
+ if (file.splitOffsets() != null) {
+ return () -> new
OffsetsBasedSplitScanTaskIterator(file.splitOffsets(), this);
+ } else {
+ return () -> new FixedSizeSplitScanTaskIterator(splitSize, this);
+ }
}
+ return ImmutableList.of(this);
}
@Override
@@ -86,16 +91,39 @@ class BaseFileScanTask implements FileScanTask {
.toString();
}
- /**
- * Visible for Testing
- */
- static final class SplitScanTaskIterator implements Iterator<FileScanTask> {
+ @VisibleForTesting
+ static final class OffsetsBasedSplitScanTaskIterator implements
Iterator<FileScanTask> {
+ private final List<Long> splitOffsets;
+ private final FileScanTask parentScanTask;
+ private int idx = 0;
+
+ OffsetsBasedSplitScanTaskIterator(List<Long> splitOffsets, FileScanTask
fileScanTask) {
+ this.splitOffsets = splitOffsets;
+ this.parentScanTask = fileScanTask;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return idx < splitOffsets.size();
+ }
+
+ @Override
+ public FileScanTask next() {
+ long start = splitOffsets.get(idx);
+ idx++;
+ long end = hasNext() ? splitOffsets.get(idx) : parentScanTask.length();
+ return new SplitScanTask(start, end - start, parentScanTask);
+ }
+ }
+
+ @VisibleForTesting
+ static final class FixedSizeSplitScanTaskIterator implements
Iterator<FileScanTask> {
private long offset;
private long remainingLen;
private long splitSize;
private final FileScanTask fileScanTask;
- SplitScanTaskIterator(long splitSize, FileScanTask fileScanTask) {
+ FixedSizeSplitScanTaskIterator(long splitSize, FileScanTask fileScanTask) {
this.offset = 0;
this.remainingLen = fileScanTask.length();
this.splitSize = splitSize;
diff --git a/core/src/test/java/org/apache/iceberg/MockFileScanTask.java
b/core/src/test/java/org/apache/iceberg/MockFileScanTask.java
new file mode 100644
index 0000000..10b235a
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/MockFileScanTask.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+public class MockFileScanTask extends BaseFileScanTask {
+
+ private final long length;
+
+ public MockFileScanTask(long length) {
+ super(null, null, null, null);
+ this.length = length;
+ }
+
+ @Override
+ public long length() {
+ return length;
+ }
+}
diff --git
a/core/src/test/java/org/apache/iceberg/TestSplitScanTaskIterator.java
b/core/src/test/java/org/apache/iceberg/TestFixedSizeSplitScanTaskIterator.java
similarity index 77%
copy from core/src/test/java/org/apache/iceberg/TestSplitScanTaskIterator.java
copy to
core/src/test/java/org/apache/iceberg/TestFixedSizeSplitScanTaskIterator.java
index b37caee..bac47e0 100644
--- a/core/src/test/java/org/apache/iceberg/TestSplitScanTaskIterator.java
+++
b/core/src/test/java/org/apache/iceberg/TestFixedSizeSplitScanTaskIterator.java
@@ -24,9 +24,9 @@ import java.util.List;
import org.junit.Assert;
import org.junit.Test;
-import static org.apache.iceberg.BaseFileScanTask.SplitScanTaskIterator;
+import static
org.apache.iceberg.BaseFileScanTask.FixedSizeSplitScanTaskIterator;
-public class TestSplitScanTaskIterator {
+public class TestFixedSizeSplitScanTaskIterator {
@Test
public void testSplits() {
verify(15L, 100L, asList(
@@ -37,7 +37,8 @@ public class TestSplitScanTaskIterator {
}
private static void verify(long splitSize, long fileLen, List<List<Long>>
offsetLenPairs) {
- List<FileScanTask> tasks = Lists.newArrayList(new
SplitScanTaskIterator(splitSize, new MockFileScanTask(fileLen)));
+ List<FileScanTask> tasks = Lists.newArrayList(
+ new FixedSizeSplitScanTaskIterator(splitSize, new
MockFileScanTask(fileLen)));
for (int i = 0; i < tasks.size(); i++) {
FileScanTask task = tasks.get(i);
List<Long> split = offsetLenPairs.get(i);
@@ -48,20 +49,6 @@ public class TestSplitScanTaskIterator {
}
}
- private static class MockFileScanTask extends BaseFileScanTask {
- private final long length;
-
- MockFileScanTask(long length) {
- super(null, null, null, null);
- this.length = length;
- }
-
- @Override
- public long length() {
- return length;
- }
- }
-
private <T> List<T> asList(T... items) {
return Lists.newArrayList(items);
}
diff --git
a/core/src/test/java/org/apache/iceberg/TestSplitScanTaskIterator.java
b/core/src/test/java/org/apache/iceberg/TestOffsetsBasedSplitScanTaskIterator.java
similarity index 56%
rename from core/src/test/java/org/apache/iceberg/TestSplitScanTaskIterator.java
rename to
core/src/test/java/org/apache/iceberg/TestOffsetsBasedSplitScanTaskIterator.java
index b37caee..0b70ebb 100644
--- a/core/src/test/java/org/apache/iceberg/TestSplitScanTaskIterator.java
+++
b/core/src/test/java/org/apache/iceberg/TestOffsetsBasedSplitScanTaskIterator.java
@@ -24,20 +24,28 @@ import java.util.List;
import org.junit.Assert;
import org.junit.Test;
-import static org.apache.iceberg.BaseFileScanTask.SplitScanTaskIterator;
-
-public class TestSplitScanTaskIterator {
+public class TestOffsetsBasedSplitScanTaskIterator {
@Test
public void testSplits() {
- verify(15L, 100L, asList(
- asList(0L, 15L), asList(15L, 15L), asList(30L, 15L), asList(45L, 15L),
asList(60L, 15L),
- asList(75L, 15L), asList(90L, 10L)));
- verify(10L, 10L, asList(asList(0L, 10L)));
- verify(20L, 10L, asList(asList(0L, 10L)));
+ // case when the last row group has more than one byte
+ verify(asList(4L, 10L, 15L, 18L, 30L, 45L), 48L, asList(
+ asList(4L, 6L), asList(10L, 5L), asList(15L, 3L), asList(18L, 12L),
asList(30L, 15L),
+ asList(45L, 3L)));
+
+ // case when the last row group has 1 byte
+ verify(asList(4L, 10L, 15L, 18L, 30L, 45L), 46L, asList(
+ asList(4L, 6L), asList(10L, 5L), asList(15L, 3L), asList(18L, 12L),
asList(30L, 15L),
+ asList(45L, 1L)));
+
+ // case when there is only one row group
+ verify(asList(4L), 48L, asList(
+ asList(4L, 44L)));
}
- private static void verify(long splitSize, long fileLen, List<List<Long>>
offsetLenPairs) {
- List<FileScanTask> tasks = Lists.newArrayList(new
SplitScanTaskIterator(splitSize, new MockFileScanTask(fileLen)));
+ private static void verify(List<Long> offsetRanges, long fileLen,
List<List<Long>> offsetLenPairs) {
+ List<FileScanTask> tasks = Lists.newArrayList(
+ new
BaseFileScanTask.OffsetsBasedSplitScanTaskIterator(offsetRanges, new
MockFileScanTask(fileLen)));
+ Assert.assertEquals("Number of tasks don't match", offsetLenPairs.size(),
tasks.size());
for (int i = 0; i < tasks.size(); i++) {
FileScanTask task = tasks.get(i);
List<Long> split = offsetLenPairs.get(i);
@@ -48,21 +56,7 @@ public class TestSplitScanTaskIterator {
}
}
- private static class MockFileScanTask extends BaseFileScanTask {
- private final long length;
-
- MockFileScanTask(long length) {
- super(null, null, null, null);
- this.length = length;
- }
-
- @Override
- public long length() {
- return length;
- }
- }
-
- private <T> List<T> asList(T... items) {
+ private static <T> List<T> asList(T... items) {
return Lists.newArrayList(items);
}
}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
index ecf9c8d..0bd4154 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
@@ -25,6 +25,7 @@ import com.google.common.collect.Sets;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -109,11 +110,17 @@ public class ParquetUtil {
toBufferMap(fileSchema, lowerBounds), toBufferMap(fileSchema,
upperBounds));
}
+
+ /**
+ * @return a list of offsets in ascending order determined by the starting
position
+ * of the row groups
+ */
public static List<Long> getSplitOffsets(ParquetMetadata md) {
List<Long> splitOffsets = new ArrayList<>(md.getBlocks().size());
for (BlockMetaData blockMetaData : md.getBlocks()) {
splitOffsets.add(blockMetaData.getStartingPos());
}
+ Collections.sort(splitOffsets);
return ImmutableList.copyOf(splitOffsets);
}