This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 7c3b0d7 Core: Add position in manifest to DataFile and DeleteFile
(#1723)
7c3b0d7 is described below
commit 7c3b0d750ecef76e0c9a33b66938d0d8602857b8
Author: Ryan Blue <[email protected]>
AuthorDate: Tue Nov 10 13:36:13 2020 -0800
Core: Add position in manifest to DataFile and DeleteFile (#1723)
---
api/src/main/java/org/apache/iceberg/ContentFile.java | 5 +++++
api/src/test/java/org/apache/iceberg/TestHelpers.java | 5 +++++
core/src/main/java/org/apache/iceberg/BaseFile.java | 18 +++++++++++++++++-
.../main/java/org/apache/iceberg/ManifestReader.java | 6 +++++-
core/src/main/java/org/apache/iceberg/V1Metadata.java | 5 +++++
core/src/main/java/org/apache/iceberg/V2Metadata.java | 5 +++++
.../org/apache/iceberg/avro/GenericAvroReader.java | 10 +++++++++-
.../java/org/apache/iceberg/avro/ValueReaders.java | 17 +++++++++++++----
.../java/org/apache/iceberg/TestManifestReader.java | 11 +++++++++++
.../java/org/apache/iceberg/spark/SparkDataFile.java | 5 +++++
10 files changed, 80 insertions(+), 7 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/ContentFile.java
b/api/src/main/java/org/apache/iceberg/ContentFile.java
index 102a807..dca2fec 100644
--- a/api/src/main/java/org/apache/iceberg/ContentFile.java
+++ b/api/src/main/java/org/apache/iceberg/ContentFile.java
@@ -30,6 +30,11 @@ import java.util.Map;
*/
public interface ContentFile<F> {
/**
+ * Returns the ordinal position of the file in a manifest, or null if it was
not read from a manifest.
+ */
+ Long pos();
+
+ /**
* Returns id of the partition spec used for partition metadata.
*/
int specId();
diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java
b/api/src/test/java/org/apache/iceberg/TestHelpers.java
index aa1a8ca..efdceba 100644
--- a/api/src/test/java/org/apache/iceberg/TestHelpers.java
+++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java
@@ -333,6 +333,11 @@ public class TestHelpers {
}
@Override
+ public Long pos() {
+ return null;
+ }
+
+ @Override
public int specId() {
return 0;
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseFile.java
b/core/src/main/java/org/apache/iceberg/BaseFile.java
index 1d74671..9fb79c8 100644
--- a/core/src/main/java/org/apache/iceberg/BaseFile.java
+++ b/core/src/main/java/org/apache/iceberg/BaseFile.java
@@ -31,6 +31,7 @@ import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.specific.SpecificData;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
@@ -53,6 +54,7 @@ abstract class BaseFile<F>
private int[] fromProjectionPos;
private Types.StructType partitionType;
+ private Long fileOrdinal = null;
private int partitionSpecId = -1;
private FileContent content = FileContent.DATA;
private String filePath = null;
@@ -91,7 +93,10 @@ abstract class BaseFile<F>
}
List<Types.NestedField> fields = schema.fields();
- List<Types.NestedField> allFields =
DataFile.getType(partitionType).fields();
+ List<Types.NestedField> allFields = Lists.newArrayList();
+ allFields.addAll(DataFile.getType(partitionType).fields());
+ allFields.add(MetadataColumns.ROW_POSITION);
+
this.fromProjectionPos = new int[fields.size()];
for (int i = 0; i < fromProjectionPos.length; i += 1) {
boolean found = false;
@@ -149,6 +154,7 @@ abstract class BaseFile<F>
* @param fullCopy whether to copy all fields or to drop column-level stats
*/
BaseFile(BaseFile<F> toCopy, boolean fullCopy) {
+ this.fileOrdinal = toCopy.fileOrdinal;
this.partitionSpecId = toCopy.partitionSpecId;
this.content = toCopy.content;
this.filePath = toCopy.filePath;
@@ -255,6 +261,9 @@ abstract class BaseFile<F>
case 13:
this.equalityIds = ArrayUtil.toIntArray((List<Integer>) value);
return;
+ case 14:
+ this.fileOrdinal = (long) value;
+ return;
default:
// ignore the object, it must be from a newer version of the format
}
@@ -301,6 +310,8 @@ abstract class BaseFile<F>
return splitOffsets();
case 13:
return equalityFieldIds();
+ case 14:
+ return pos;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " +
pos);
}
@@ -317,6 +328,11 @@ abstract class BaseFile<F>
}
@Override
+ public Long pos() {
+ return fileOrdinal;
+ }
+
+ @Override
public FileContent content() {
return content;
}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java
b/core/src/main/java/org/apache/iceberg/ManifestReader.java
index 939de8d..f791079 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java
@@ -189,10 +189,14 @@ public class ManifestReader<F extends ContentFile<F>>
FileFormat format = FileFormat.fromFileName(file.location());
Preconditions.checkArgument(format != null, "Unable to determine format of
manifest: %s", file);
+ List<Types.NestedField> fields = Lists.newArrayList();
+ fields.addAll(projection.asStruct().fields());
+ fields.add(MetadataColumns.ROW_POSITION);
+
switch (format) {
case AVRO:
AvroIterable<ManifestEntry<F>> reader = Avro.read(file)
- .project(ManifestEntry.wrapFileSchema(projection.asStruct()))
+ .project(ManifestEntry.wrapFileSchema(Types.StructType.of(fields)))
.rename("manifest_entry", GenericManifestEntry.class.getName())
.rename("partition", PartitionData.class.getName())
.rename("r102", PartitionData.class.getName())
diff --git a/core/src/main/java/org/apache/iceberg/V1Metadata.java
b/core/src/main/java/org/apache/iceberg/V1Metadata.java
index cb6c922..4b11860 100644
--- a/core/src/main/java/org/apache/iceberg/V1Metadata.java
+++ b/core/src/main/java/org/apache/iceberg/V1Metadata.java
@@ -365,6 +365,11 @@ class V1Metadata {
}
@Override
+ public Long pos() {
+ return null;
+ }
+
+ @Override
public int specId() {
return wrapped.specId();
}
diff --git a/core/src/main/java/org/apache/iceberg/V2Metadata.java
b/core/src/main/java/org/apache/iceberg/V2Metadata.java
index 94ff1da..55a91c9 100644
--- a/core/src/main/java/org/apache/iceberg/V2Metadata.java
+++ b/core/src/main/java/org/apache/iceberg/V2Metadata.java
@@ -416,6 +416,11 @@ class V2Metadata {
}
@Override
+ public Long pos() {
+ return null;
+ }
+
+ @Override
public int specId() {
return wrapped.specId();
}
diff --git a/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java
b/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java
index c8a81ca..5957cae 100644
--- a/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java
+++ b/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.avro;
import java.io.IOException;
import java.util.List;
+import java.util.function.Supplier;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
@@ -30,7 +31,7 @@ import org.apache.avro.io.Decoder;
import org.apache.iceberg.common.DynClasses;
import org.apache.iceberg.data.avro.DecoderResolver;
-class GenericAvroReader<T> implements DatumReader<T> {
+class GenericAvroReader<T> implements DatumReader<T>, SupportsRowPosition {
private final Schema readSchema;
private ClassLoader loader = Thread.currentThread().getContextClassLoader();
@@ -57,6 +58,13 @@ class GenericAvroReader<T> implements DatumReader<T> {
}
@Override
+ public void setRowPositionSupplier(Supplier<Long> posSupplier) {
+ if (reader instanceof SupportsRowPosition) {
+ ((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier);
+ }
+ }
+
+ @Override
public T read(T reuse, Decoder decoder) throws IOException {
return DecoderResolver.resolveAndRead(decoder, readSchema, fileSchema,
reader, reuse);
}
diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
index e69c111..5e72c9b 100644
--- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
+++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
@@ -580,10 +580,19 @@ public class ValueReaders {
private final Object[] constants;
private int posField = -1;
- protected StructReader(List<ValueReader<?>> readers) {
+ protected StructReader(List<ValueReader<?>> readers, Schema schema) {
this.readers = readers.toArray(new ValueReader[0]);
this.positions = new int[0];
this.constants = new Object[0];
+
+ List<Schema.Field> fields = schema.getFields();
+ for (int pos = 0; pos < fields.size(); pos += 1) {
+ Schema.Field field = fields.get(pos);
+ if (AvroSchemaUtil.getFieldId(field) ==
MetadataColumns.ROW_POSITION.fieldId()) {
+ // track where the _pos field is located for setRowPositionSupplier
+ this.posField = pos;
+ }
+ }
}
protected StructReader(List<ValueReader<?>> readers, Types.StructType
struct, Map<Integer, ?> idToConstant) {
@@ -609,7 +618,7 @@ public class ValueReaders {
@Override
public void setRowPositionSupplier(Supplier<Long> posSupplier) {
- if (posField > 0) {
+ if (posField >= 0) {
long startingPos = posSupplier.get();
this.readers[posField] = new PositionReader(startingPos);
for (ValueReader<?> reader : readers) {
@@ -667,7 +676,7 @@ public class ValueReaders {
private final Schema recordSchema;
private RecordReader(List<ValueReader<?>> readers, Schema recordSchema) {
- super(readers);
+ super(readers, recordSchema);
this.recordSchema = recordSchema;
}
@@ -697,7 +706,7 @@ public class ValueReaders {
private final Schema schema;
IndexedRecordReader(List<ValueReader<?>> readers, Class<R> recordClass,
Schema schema) {
- super(readers);
+ super(readers, schema);
this.recordClass = recordClass;
this.ctor = DynConstructors.builder(IndexedRecord.class)
.hiddenImpl(recordClass, Schema.class)
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReader.java
b/core/src/test/java/org/apache/iceberg/TestManifestReader.java
index c1c2423..1163b39 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestReader.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestReader.java
@@ -120,4 +120,15 @@ public class TestManifestReader extends TableTestBase {
}
}
+ @Test
+ public void testDataFilePositions() throws IOException {
+ ManifestFile manifest = writeManifest(1000L, FILE_A, FILE_B, FILE_C);
+ try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest,
FILE_IO)) {
+ long expectedPos = 0L;
+ for (DataFile file : reader) {
+ Assert.assertEquals("Position should match", (Long) expectedPos,
file.pos());
+ expectedPos += 1;
+ }
+ }
+ }
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
b/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
index 48dd001..14233c9 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
@@ -88,6 +88,11 @@ public class SparkDataFile implements DataFile {
}
@Override
+ public Long pos() {
+ return null;
+ }
+
+ @Override
public int specId() {
return -1;
}