This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 549da80 Store split offsets for Parquet files (#186)
549da80 is described below
commit 549da809490976b53a13b14596dd240ed74bce5e
Author: Samarth Jain <[email protected]>
AuthorDate: Thu May 16 14:39:07 2019 -0700
Store split offsets for Parquet files (#186)
---
api/src/main/java/org/apache/iceberg/DataFile.java | 11 +++-
.../java/org/apache/iceberg/io/FileAppender.java | 10 ++++
.../test/java/org/apache/iceberg/TestHelpers.java | 5 ++
.../main/java/org/apache/iceberg/DataFiles.java | 64 ++++++----------------
.../java/org/apache/iceberg/GenericDataFile.java | 41 ++++++--------
.../{ParquetMetrics.java => ParquetUtil.java} | 22 ++++++--
.../iceberg/parquet/ParquetWriteAdapter.java | 8 ++-
.../org/apache/iceberg/parquet/ParquetWriter.java | 9 ++-
...estParquetMetrics.java => TestParquetUtil.java} | 13 ++---
.../org/apache/iceberg/spark/source/Writer.java | 7 ++-
.../org/apache/iceberg/spark/SparkTableUtil.scala | 6 +-
.../iceberg/spark/source/TestParquetScan.java | 4 +-
.../iceberg/spark/source/TestParquetWrite.java | 16 +++++-
13 files changed, 119 insertions(+), 97 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/DataFile.java
b/api/src/main/java/org/apache/iceberg/DataFile.java
index b8c02be..d4c1b67 100644
--- a/api/src/main/java/org/apache/iceberg/DataFile.java
+++ b/api/src/main/java/org/apache/iceberg/DataFile.java
@@ -58,8 +58,9 @@ public interface DataFile {
IntegerType.get(), BinaryType.get())),
optional(128, "upper_bounds", MapType.ofRequired(129, 130,
IntegerType.get(), BinaryType.get())),
- optional(131, "key_metadata", BinaryType.get())
- // NEXT ID TO ASSIGN: 132
+ optional(131, "key_metadata", BinaryType.get()),
+ optional(132, "split_offsets", ListType.ofRequired(133,
LongType.get()))
+ // NEXT ID TO ASSIGN: 134
);
}
@@ -136,4 +137,10 @@ public interface DataFile {
* @return a copy of this data file
*/
DataFile copy();
+
+ /**
+ * @return a list of offsets for file blocks if applicable, null otherwise.
When available, this
+ * information is used for planning scan tasks whose boundaries are
determined by these offsets.
+ */
+ List<Long> splitOffsets();
}
diff --git a/api/src/main/java/org/apache/iceberg/io/FileAppender.java
b/api/src/main/java/org/apache/iceberg/io/FileAppender.java
index b98859a..74229cc 100644
--- a/api/src/main/java/org/apache/iceberg/io/FileAppender.java
+++ b/api/src/main/java/org/apache/iceberg/io/FileAppender.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.io;
import java.io.Closeable;
import java.util.Iterator;
+import java.util.List;
import org.apache.iceberg.Metrics;
public interface FileAppender<D> extends Closeable {
@@ -45,4 +46,13 @@ public interface FileAppender<D> extends Closeable {
* @return the length of this file. Only valid after the file is closed.
*/
long length();
+
+ /**
+ * @return a list of offsets for file blocks if applicable, null otherwise.
When available, this
+ * information is used for planning scan tasks whose boundaries are
determined by these offsets.
+ * Only valid after the file is closed.
+ */
+ default List<Long> splitOffsets() {
+ return null;
+ }
}
diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java
b/api/src/test/java/org/apache/iceberg/TestHelpers.java
index ab92634..d92f789 100644
--- a/api/src/test/java/org/apache/iceberg/TestHelpers.java
+++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java
@@ -376,5 +376,10 @@ public class TestHelpers {
public DataFile copy() {
return this;
}
+
+ @Override
+ public List<Long> splitOffsets() {
+ return null;
+ }
}
}
diff --git a/core/src/main/java/org/apache/iceberg/DataFiles.java
b/core/src/main/java/org/apache/iceberg/DataFiles.java
index 9a3111a..1819b64 100644
--- a/core/src/main/java/org/apache/iceberg/DataFiles.java
+++ b/core/src/main/java/org/apache/iceberg/DataFiles.java
@@ -20,6 +20,7 @@
package org.apache.iceberg;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Locale;
@@ -106,19 +107,12 @@ public class DataFiles {
location, format, partition, rowCount, stat.getLen());
}
- public static DataFile fromStat(FileStatus stat, PartitionData partition,
Metrics metrics) {
- String location = stat.getPath().toString();
- FileFormat format = FileFormat.fromFileName(location);
- return new GenericDataFile(
- location, format, partition, stat.getLen(), metrics);
- }
-
public static DataFile fromStat(FileStatus stat, PartitionData partition,
Metrics metrics,
- EncryptionKeyMetadata keyMetadata) {
+ EncryptionKeyMetadata keyMetadata, List<Long> splitOffsets) {
String location = stat.getPath().toString();
FileFormat format = FileFormat.fromFileName(location);
return new GenericDataFile(
- location, format, partition, stat.getLen(), metrics,
keyMetadata.buffer());
+ location, format, partition, stat.getLen(), metrics,
keyMetadata.buffer(), splitOffsets);
}
public static DataFile fromInputFile(InputFile file, PartitionData
partition, long rowCount) {
@@ -132,17 +126,6 @@ public class DataFiles {
location, format, partition, rowCount, file.getLength());
}
- public static DataFile fromInputFile(InputFile file, PartitionData
partition, Metrics metrics) {
- if (file instanceof HadoopInputFile) {
- return fromStat(((HadoopInputFile) file).getStat(), partition, metrics);
- }
-
- String location = file.location();
- FileFormat format = FileFormat.fromFileName(location);
- return new GenericDataFile(
- location, format, partition, file.getLength(), metrics);
- }
-
public static DataFile fromInputFile(InputFile file, long rowCount) {
if (file instanceof HadoopInputFile) {
return fromStat(((HadoopInputFile) file).getStat(), rowCount);
@@ -154,37 +137,17 @@ public class DataFiles {
}
public static DataFile fromEncryptedOutputFile(EncryptedOutputFile
encryptedFile, PartitionData partition,
- Metrics metrics) {
+ Metrics metrics, List<Long>
splitOffsets) {
EncryptionKeyMetadata keyMetadata = encryptedFile.keyMetadata();
InputFile file = encryptedFile.encryptingOutputFile().toInputFile();
if (encryptedFile instanceof HadoopInputFile) {
- return fromStat(((HadoopInputFile) file).getStat(), partition, metrics,
keyMetadata);
+ return fromStat(((HadoopInputFile) file).getStat(), partition, metrics,
keyMetadata, splitOffsets);
}
String location = file.location();
FileFormat format = FileFormat.fromFileName(location);
return new GenericDataFile(
- location, format, partition, file.getLength(), metrics,
keyMetadata.buffer());
- }
-
- public static DataFile fromParquetInputFile(InputFile file,
- PartitionData partition,
- Metrics metrics) {
- if (file instanceof HadoopInputFile) {
- return fromParquetStat(((HadoopInputFile) file).getStat(), partition,
metrics);
- }
-
- String location = file.location();
- FileFormat format = FileFormat.PARQUET;
- return new GenericDataFile(
- location, format, partition, file.getLength(), metrics);
- }
-
- public static DataFile fromParquetStat(FileStatus stat, PartitionData
partition, Metrics metrics) {
- String location = stat.getPath().toString();
- FileFormat format = FileFormat.PARQUET;
- return new GenericDataFile(
- location, format, partition, stat.getLen(), metrics);
+ location, format, partition, file.getLength(), metrics,
keyMetadata.buffer(), splitOffsets);
}
public static Builder builder(PartitionSpec spec) {
@@ -211,6 +174,7 @@ public class DataFiles {
private Map<Integer, ByteBuffer> lowerBounds = null;
private Map<Integer, ByteBuffer> upperBounds = null;
private ByteBuffer keyMetadata = null;
+ private List<Long> splitOffsets = null;
public Builder() {
this.spec = null;
@@ -237,6 +201,7 @@ public class DataFiles {
this.nullValueCounts = null;
this.lowerBounds = null;
this.upperBounds = null;
+ this.splitOffsets = null;
}
public Builder copy(DataFile toCopy) {
@@ -254,6 +219,7 @@ public class DataFiles {
this.upperBounds = toCopy.upperBounds();
this.keyMetadata = toCopy.keyMetadata() == null ? null
: ByteBuffers.copy(toCopy.keyMetadata());
+ this.splitOffsets = toCopy.splitOffsets() == null ? null :
ImmutableList.copyOf(toCopy.splitOffsets());
return this;
}
@@ -327,6 +293,11 @@ public class DataFiles {
return this;
}
+ public Builder withSplitOffsets(List<Long> offsets) {
+ this.splitOffsets = offsets == null ? null :
ImmutableList.copyOf(offsets);
+ return this;
+ }
+
public Builder withEncryptionKeyMetadata(ByteBuffer newKeyMetadata) {
this.keyMetadata = newKeyMetadata;
return this;
@@ -336,10 +307,6 @@ public class DataFiles {
return withEncryptionKeyMetadata(newKeyMetadata.buffer());
}
- public Builder withEncryptionKeyMetadata(byte[] newKeyMetadata) {
- return withEncryptionKeyMetadata(ByteBuffer.wrap(newKeyMetadata));
- }
-
public DataFile build() {
Preconditions.checkArgument(filePath != null, "File path is required");
if (format == null) {
@@ -352,7 +319,8 @@ public class DataFiles {
return new GenericDataFile(
filePath, format, isPartitioned ? partitionData.copy() : null,
fileSizeInBytes, new Metrics(
- recordCount, columnSizes, valueCounts, nullValueCounts,
lowerBounds, upperBounds), keyMetadata);
+ recordCount, columnSizes, valueCounts, nullValueCounts,
lowerBounds, upperBounds),
+ keyMetadata, splitOffsets);
}
}
}
diff --git a/core/src/main/java/org/apache/iceberg/GenericDataFile.java
b/core/src/main/java/org/apache/iceberg/GenericDataFile.java
index e38217d..7544808 100644
--- a/core/src/main/java/org/apache/iceberg/GenericDataFile.java
+++ b/core/src/main/java/org/apache/iceberg/GenericDataFile.java
@@ -63,6 +63,7 @@ class GenericDataFile
private Map<Integer, Long> nullValueCounts = null;
private Map<Integer, ByteBuffer> lowerBounds = null;
private Map<Integer, ByteBuffer> upperBounds = null;
+ private List<Long> splitOffsets = null;
private transient ByteBuffer keyMetadata = null;
// cached schema
@@ -115,14 +116,6 @@ class GenericDataFile
this.partitionType = EMPTY_PARTITION_DATA.getPartitionType();
this.recordCount = recordCount;
this.fileSizeInBytes = fileSizeInBytes;
- this.fileOrdinal = null;
- this.sortColumns = null;
- this.columnSizes = null;
- this.valueCounts = null;
- this.nullValueCounts = null;
- this.lowerBounds = null;
- this.upperBounds = null;
- this.fromProjectionPos = null;
}
GenericDataFile(String filePath, FileFormat format, PartitionData partition,
@@ -133,18 +126,10 @@ class GenericDataFile
this.partitionType = partition.getPartitionType();
this.recordCount = recordCount;
this.fileSizeInBytes = fileSizeInBytes;
- this.fileOrdinal = null;
- this.sortColumns = null;
- this.columnSizes = null;
- this.valueCounts = null;
- this.nullValueCounts = null;
- this.lowerBounds = null;
- this.upperBounds = null;
- this.fromProjectionPos = null;
}
GenericDataFile(String filePath, FileFormat format, PartitionData partition,
- long fileSizeInBytes, Metrics metrics) {
+ long fileSizeInBytes, Metrics metrics, List<Long>
splitOffsets) {
this.filePath = filePath;
this.format = format;
@@ -160,20 +145,18 @@ class GenericDataFile
// this will throw NPE if metrics.recordCount is null
this.recordCount = metrics.recordCount();
this.fileSizeInBytes = fileSizeInBytes;
- this.fileOrdinal = null;
- this.sortColumns = null;
this.columnSizes = metrics.columnSizes();
this.valueCounts = metrics.valueCounts();
this.nullValueCounts = metrics.nullValueCounts();
this.lowerBounds = SerializableByteBufferMap.wrap(metrics.lowerBounds());
this.upperBounds = SerializableByteBufferMap.wrap(metrics.upperBounds());
- this.fromProjectionPos = null;
+ this.splitOffsets = copy(splitOffsets);
}
GenericDataFile(String filePath, FileFormat format, PartitionData partition,
long fileSizeInBytes, Metrics metrics,
- ByteBuffer keyMetadata) {
- this(filePath, format, partition, fileSizeInBytes, metrics);
+ ByteBuffer keyMetadata, List<Long> splitOffsets) {
+ this(filePath, format, partition, fileSizeInBytes, metrics, splitOffsets);
this.keyMetadata = keyMetadata;
}
@@ -199,6 +182,7 @@ class GenericDataFile
this.upperBounds =
SerializableByteBufferMap.wrap(copy(toCopy.upperBounds));
this.fromProjectionPos = toCopy.fromProjectionPos;
this.keyMetadata = toCopy.keyMetadata == null ? null :
ByteBuffers.copy(toCopy.keyMetadata);
+ this.splitOffsets = copy(toCopy.splitOffsets);
}
/**
@@ -273,6 +257,11 @@ class GenericDataFile
}
@Override
+ public List<Long> splitOffsets() {
+ return splitOffsets;
+ }
+
+ @Override
public org.apache.avro.Schema getSchema() {
if (avroSchema == null) {
this.avroSchema = getAvroSchema(partitionType);
@@ -331,6 +320,9 @@ class GenericDataFile
case 13:
this.keyMetadata = (ByteBuffer) v;
return;
+ case 14:
+ this.splitOffsets = (List<Long>) v;
+ return;
default:
// ignore the object, it must be from a newer version of the format
}
@@ -379,6 +371,8 @@ class GenericDataFile
return upperBounds;
case 13:
return keyMetadata;
+ case 14:
+ return splitOffsets;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " +
pos);
}
@@ -398,7 +392,7 @@ class GenericDataFile
@Override
public int size() {
- return 14;
+ return 15;
}
@Override
@@ -415,6 +409,7 @@ class GenericDataFile
.add("lower_bounds", lowerBounds)
.add("upper_bounds", upperBounds)
.add("key_metadata", keyMetadata == null ? "null" : "(redacted)")
+ .add("split_offsets", splitOffsets == null ? "null" : splitOffsets)
.toString();
}
diff --git
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetrics.java
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
similarity index 90%
rename from parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetrics.java
rename to parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
index 678d24f..ecf9c8d 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetrics.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
@@ -19,11 +19,12 @@
package org.apache.iceberg.parquet;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.IOException;
-import java.io.Serializable;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -46,19 +47,20 @@ import org.apache.parquet.schema.MessageType;
import static
org.apache.iceberg.parquet.ParquetConversions.fromParquetPrimitive;
-public class ParquetMetrics implements Serializable {
- private ParquetMetrics() {
+public class ParquetUtil {
+ // not meant to be instantiated
+ private ParquetUtil() {
}
- public static Metrics fromInputFile(InputFile file) {
+ public static Metrics fileMetrics(InputFile file) {
try (ParquetFileReader reader =
ParquetFileReader.open(ParquetIO.file(file))) {
- return fromMetadata(reader.getFooter());
+ return footerMetrics(reader.getFooter());
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to read footer of file: %s",
file);
}
}
- public static Metrics fromMetadata(ParquetMetadata metadata) {
+ public static Metrics footerMetrics(ParquetMetadata metadata) {
long rowCount = 0;
Map<Integer, Long> columnSizes = Maps.newHashMap();
Map<Integer, Long> valueCounts = Maps.newHashMap();
@@ -107,6 +109,14 @@ public class ParquetMetrics implements Serializable {
toBufferMap(fileSchema, lowerBounds), toBufferMap(fileSchema,
upperBounds));
}
+ public static List<Long> getSplitOffsets(ParquetMetadata md) {
+ List<Long> splitOffsets = new ArrayList<>(md.getBlocks().size());
+ for (BlockMetaData blockMetaData : md.getBlocks()) {
+ splitOffsets.add(blockMetaData.getStartingPos());
+ }
+ return ImmutableList.copyOf(splitOffsets);
+ }
+
// we allow struct nesting, but not maps or arrays
private static boolean shouldStoreBounds(ColumnPath columnPath, Schema
schema) {
Iterator<String> pathIterator = columnPath.iterator();
diff --git
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteAdapter.java
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteAdapter.java
index 7d31e0c..292eb89 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteAdapter.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteAdapter.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.parquet;
import com.google.common.base.Preconditions;
import java.io.IOException;
+import java.util.List;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.FileAppender;
@@ -47,7 +48,7 @@ public class ParquetWriteAdapter<D> implements
FileAppender<D> {
@Override
public Metrics metrics() {
Preconditions.checkState(footer != null, "Cannot produce metrics until
closed");
- return ParquetMetrics.fromMetadata(footer);
+ return ParquetUtil.footerMetrics(footer);
}
@Override
@@ -58,6 +59,11 @@ public class ParquetWriteAdapter<D> implements
FileAppender<D> {
}
@Override
+ public List<Long> splitOffsets() {
+ return ParquetUtil.getSplitOffsets(writer.getFooter());
+ }
+
+ @Override
public void close() throws IOException {
if (writer != null) {
writer.close();
diff --git
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
index 8ad126e..c7bd6e2 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.parquet;
import com.google.common.collect.ImmutableMap;
import java.io.Closeable;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
@@ -44,7 +45,6 @@ import org.apache.parquet.schema.MessageType;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static org.apache.iceberg.parquet.ParquetSchemaUtil.convert;
-import static
org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0;
class ParquetWriter<T> implements FileAppender<T>, Closeable {
private static final DynConstructors.Ctor<PageWriteStore> pageStoreCtor =
DynConstructors
@@ -115,7 +115,7 @@ class ParquetWriter<T> implements FileAppender<T>,
Closeable {
@Override
public Metrics metrics() {
- return ParquetMetrics.fromMetadata(writer.getFooter());
+ return ParquetUtil.footerMetrics(writer.getFooter());
}
@Override
@@ -127,6 +127,11 @@ class ParquetWriter<T> implements FileAppender<T>,
Closeable {
}
}
+ @Override
+ public List<Long> splitOffsets() {
+ return ParquetUtil.getSplitOffsets(writer.getFooter());
+ }
+
private void checkSize() {
if (recordCount >= nextCheckRecordCount) {
long bufferedSize = writeStore.getBufferedSize();
diff --git
a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java
b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetUtil.java
similarity index 96%
rename from
parquet/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java
rename to parquet/src/test/java/org/apache/iceberg/parquet/TestParquetUtil.java
index ec03850..1b681d3 100644
--- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java
+++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetUtil.java
@@ -60,8 +60,7 @@ import static
org.apache.iceberg.types.Conversions.fromByteBuffer;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
-public class TestParquetMetrics extends BaseParquetWritingTest {
-
+public class TestParquetUtil extends BaseParquetWritingTest {
private final UUID uuid = UUID.randomUUID();
private final GenericFixed fixed = new GenericData.Fixed(
org.apache.avro.Schema.createFixed("fixedCol", null, null, 4),
@@ -116,7 +115,7 @@ public class TestParquetMetrics extends
BaseParquetWritingTest {
File parquetFile = writeRecords(schema, firstRecord, secondRecord);
- Metrics metrics = ParquetMetrics.fromInputFile(localInput(parquetFile));
+ Metrics metrics = ParquetUtil.fileMetrics(localInput(parquetFile));
Assert.assertEquals(2L, (long) metrics.recordCount());
assertCounts(1, 2L, 0L, metrics);
assertBounds(1, BooleanType.get(), false, true, metrics);
@@ -163,7 +162,7 @@ public class TestParquetMetrics extends
BaseParquetWritingTest {
File parquetFile = writeRecords(schema, record);
- Metrics metrics = ParquetMetrics.fromInputFile(localInput(parquetFile));
+ Metrics metrics = ParquetUtil.fileMetrics(localInput(parquetFile));
Assert.assertEquals(1L, (long) metrics.recordCount());
assertCounts(1, 1L, 0L, metrics);
assertBounds(1, DecimalType.of(4, 2), new BigDecimal("2.55"), new
BigDecimal("2.55"), metrics);
@@ -200,7 +199,7 @@ public class TestParquetMetrics extends
BaseParquetWritingTest {
File parquetFile = writeRecords(schema, record);
- Metrics metrics = ParquetMetrics.fromInputFile(localInput(parquetFile));
+ Metrics metrics = ParquetUtil.fileMetrics(localInput(parquetFile));
Assert.assertEquals(1L, (long) metrics.recordCount());
assertCounts(1, 1L, 0L, metrics);
assertBounds(1, IntegerType.get(), Integer.MAX_VALUE, Integer.MAX_VALUE,
metrics);
@@ -235,7 +234,7 @@ public class TestParquetMetrics extends
BaseParquetWritingTest {
File parquetFile = writeRecords(schema, record);
- Metrics metrics = ParquetMetrics.fromInputFile(localInput(parquetFile));
+ Metrics metrics = ParquetUtil.fileMetrics(localInput(parquetFile));
Assert.assertEquals(1L, (long) metrics.recordCount());
assertCounts(1, 1, 0, metrics);
assertBounds(1, IntegerType.get(), null, null, metrics);
@@ -259,7 +258,7 @@ public class TestParquetMetrics extends
BaseParquetWritingTest {
File parquetFile = writeRecords(schema, firstRecord, secondRecord);
- Metrics metrics = ParquetMetrics.fromInputFile(localInput(parquetFile));
+ Metrics metrics = ParquetUtil.fileMetrics(localInput(parquetFile));
Assert.assertEquals(2L, (long) metrics.recordCount());
assertCounts(1, 2, 2, metrics);
assertBounds(1, IntegerType.get(), null, null, metrics);
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
index 4124136..c202671 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
@@ -244,6 +244,7 @@ class Writer implements DataSourceWriter {
private final FileIO fileIo;
private FileAppender<InternalRow> appender = null;
private Metrics metrics = null;
+ private List<Long> offsetRanges = null;
private final EncryptedOutputFile file;
UnpartitionedWriter(
@@ -265,6 +266,7 @@ class Writer implements DataSourceWriter {
public WriterCommitMessage commit() throws IOException {
Preconditions.checkArgument(appender != null, "Commit called on a closed
writer: %s", this);
+ // metrics and splitOffsets are populated on close
close();
if (metrics.recordCount() == 0L) {
@@ -272,7 +274,7 @@ class Writer implements DataSourceWriter {
return new TaskCommit();
}
- DataFile dataFile = DataFiles.fromEncryptedOutputFile(file, null,
metrics);
+ DataFile dataFile = DataFiles.fromEncryptedOutputFile(file, null,
metrics, offsetRanges);
return new TaskCommit(dataFile);
}
@@ -290,6 +292,7 @@ class Writer implements DataSourceWriter {
if (this.appender != null) {
this.appender.close();
this.metrics = appender.metrics();
+ this.offsetRanges = appender.splitOffsets();
this.appender = null;
}
}
@@ -371,12 +374,14 @@ class Writer implements DataSourceWriter {
currentAppender.close();
// metrics are only valid after the appender is closed
Metrics metrics = currentAppender.metrics();
+ List<Long> splitOffsets = currentAppender.splitOffsets();
this.currentAppender = null;
DataFile dataFile = DataFiles.builder(spec)
.withEncryptedOutputFile(currentFile)
.withPartition(currentKey)
.withMetrics(metrics)
+ .withSplitOffsets(splitOffsets)
.build();
completedPartitions.add(currentKey);
diff --git a/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
b/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
index ca835d6..86dfbe3 100644
--- a/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
+++ b/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
@@ -21,17 +21,15 @@ package org.apache.iceberg.spark
import java.nio.ByteBuffer
import java.util
-
import com.google.common.collect.Maps
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, PathFilter}
+import org.apache.iceberg.parquet.ParquetUtil
import org.apache.iceberg.{DataFile, DataFiles, Metrics, PartitionSpec}
-import org.apache.iceberg.parquet.ParquetMetrics
import org.apache.iceberg.spark.hacks.Hive
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
-
import scala.collection.JavaConverters._
object SparkTableUtil {
@@ -236,7 +234,7 @@ object SparkTableUtil {
val fs = partition.getFileSystem(conf)
fs.listStatus(partition, HiddenPathFilter).filter(_.isFile).map { stat =>
- val metrics =
ParquetMetrics.fromMetadata(ParquetFileReader.readFooter(conf, stat))
+ val metrics =
ParquetUtil.footerMetrics(ParquetFileReader.readFooter(conf, stat))
SparkDataFile(
stat.getPath.toString,
diff --git
a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetScan.java
b/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetScan.java
index 0475f40..ced85d4 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetScan.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetScan.java
@@ -51,7 +51,7 @@ import org.junit.rules.TemporaryFolder;
import static org.apache.iceberg.Files.localInput;
import static org.apache.iceberg.Files.localOutput;
-import static org.apache.iceberg.parquet.ParquetMetrics.fromInputFile;
+import static org.apache.iceberg.parquet.ParquetUtil.fileMetrics;
public class TestParquetScan extends AvroDataTest {
private static final Configuration CONF = new Configuration();
@@ -106,7 +106,7 @@ public class TestParquetScan extends AvroDataTest {
DataFile file = DataFiles.builder(PartitionSpec.unpartitioned())
.withFileSizeInBytes(parquetFile.length())
.withPath(parquetFile.toString())
- .withMetrics(fromInputFile(localInput(parquetFile)))
+ .withMetrics(fileMetrics(localInput(parquetFile)))
.build();
table.newAppend().appendFile(file).commit();
diff --git
a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java
b/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java
index 59504b1..59d8e7f 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java
@@ -24,6 +24,9 @@ import java.io.File;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestReader;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
@@ -40,6 +43,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import static org.apache.iceberg.Files.localInput;
import static org.apache.iceberg.types.Types.NestedField.optional;
public class TestParquetWrite {
@@ -96,8 +100,18 @@ public class TestParquetWrite {
.load(location.toString());
List<SimpleRecord> actual =
result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
-
Assert.assertEquals("Number of rows should match", expected.size(),
actual.size());
Assert.assertEquals("Result rows should match", expected, actual);
+ for (ManifestFile manifest : table.currentSnapshot().manifests()) {
+ for (DataFile file : ManifestReader.read(localInput(manifest.path()),
null)) {
+ Assert.assertNotNull("Split offsets not present", file.splitOffsets());
+ Assert.assertEquals("Should have reported record count as 1" , 1,
file.recordCount());
+ Assert.assertNotNull("Column sizes metric not present",
file.columnSizes());
+ Assert.assertNotNull("Counts metric not present", file.valueCounts());
+ Assert.assertNotNull("Null value counts metric not present",
file.nullValueCounts());
+ Assert.assertNotNull("Lower bounds metric not present",
file.lowerBounds());
+ Assert.assertNotNull("Upper bounds metric not present",
file.upperBounds());
+ }
+ }
}
}