This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c3b0d7  Core: Add position in manifest to DataFile and DeleteFile 
(#1723)
7c3b0d7 is described below

commit 7c3b0d750ecef76e0c9a33b66938d0d8602857b8
Author: Ryan Blue <[email protected]>
AuthorDate: Tue Nov 10 13:36:13 2020 -0800

    Core: Add position in manifest to DataFile and DeleteFile (#1723)
---
 api/src/main/java/org/apache/iceberg/ContentFile.java  |  5 +++++
 api/src/test/java/org/apache/iceberg/TestHelpers.java  |  5 +++++
 core/src/main/java/org/apache/iceberg/BaseFile.java    | 18 +++++++++++++++++-
 .../main/java/org/apache/iceberg/ManifestReader.java   |  6 +++++-
 core/src/main/java/org/apache/iceberg/V1Metadata.java  |  5 +++++
 core/src/main/java/org/apache/iceberg/V2Metadata.java  |  5 +++++
 .../org/apache/iceberg/avro/GenericAvroReader.java     | 10 +++++++++-
 .../java/org/apache/iceberg/avro/ValueReaders.java     | 17 +++++++++++++----
 .../java/org/apache/iceberg/TestManifestReader.java    | 11 +++++++++++
 .../java/org/apache/iceberg/spark/SparkDataFile.java   |  5 +++++
 10 files changed, 80 insertions(+), 7 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/ContentFile.java 
b/api/src/main/java/org/apache/iceberg/ContentFile.java
index 102a807..dca2fec 100644
--- a/api/src/main/java/org/apache/iceberg/ContentFile.java
+++ b/api/src/main/java/org/apache/iceberg/ContentFile.java
@@ -30,6 +30,11 @@ import java.util.Map;
  */
 public interface ContentFile<F> {
   /**
+   * Returns the ordinal position of the file in a manifest, or null if it was 
not read from a manifest.
+   */
+  Long pos();
+
+  /**
    * Returns id of the partition spec used for partition metadata.
    */
   int specId();
diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java 
b/api/src/test/java/org/apache/iceberg/TestHelpers.java
index aa1a8ca..efdceba 100644
--- a/api/src/test/java/org/apache/iceberg/TestHelpers.java
+++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java
@@ -333,6 +333,11 @@ public class TestHelpers {
     }
 
     @Override
+    public Long pos() {
+      return null;
+    }
+
+    @Override
     public int specId() {
       return 0;
     }
diff --git a/core/src/main/java/org/apache/iceberg/BaseFile.java 
b/core/src/main/java/org/apache/iceberg/BaseFile.java
index 1d74671..9fb79c8 100644
--- a/core/src/main/java/org/apache/iceberg/BaseFile.java
+++ b/core/src/main/java/org/apache/iceberg/BaseFile.java
@@ -31,6 +31,7 @@ import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.specific.SpecificData;
 import org.apache.iceberg.avro.AvroSchemaUtil;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
@@ -53,6 +54,7 @@ abstract class BaseFile<F>
   private int[] fromProjectionPos;
   private Types.StructType partitionType;
 
+  private Long fileOrdinal = null;
   private int partitionSpecId = -1;
   private FileContent content = FileContent.DATA;
   private String filePath = null;
@@ -91,7 +93,10 @@ abstract class BaseFile<F>
     }
 
     List<Types.NestedField> fields = schema.fields();
-    List<Types.NestedField> allFields = 
DataFile.getType(partitionType).fields();
+    List<Types.NestedField> allFields = Lists.newArrayList();
+    allFields.addAll(DataFile.getType(partitionType).fields());
+    allFields.add(MetadataColumns.ROW_POSITION);
+
     this.fromProjectionPos = new int[fields.size()];
     for (int i = 0; i < fromProjectionPos.length; i += 1) {
       boolean found = false;
@@ -149,6 +154,7 @@ abstract class BaseFile<F>
    * @param fullCopy whether to copy all fields or to drop column-level stats
    */
   BaseFile(BaseFile<F> toCopy, boolean fullCopy) {
+    this.fileOrdinal = toCopy.fileOrdinal;
     this.partitionSpecId = toCopy.partitionSpecId;
     this.content = toCopy.content;
     this.filePath = toCopy.filePath;
@@ -255,6 +261,9 @@ abstract class BaseFile<F>
       case 13:
         this.equalityIds = ArrayUtil.toIntArray((List<Integer>) value);
         return;
+      case 14:
+        this.fileOrdinal = (long) value;
+        return;
       default:
         // ignore the object, it must be from a newer version of the format
     }
@@ -301,6 +310,8 @@ abstract class BaseFile<F>
         return splitOffsets();
       case 13:
         return equalityFieldIds();
+      case 14:
+        return pos;
       default:
         throw new UnsupportedOperationException("Unknown field ordinal: " + 
pos);
     }
@@ -317,6 +328,11 @@ abstract class BaseFile<F>
   }
 
   @Override
+  public Long pos() {
+    return fileOrdinal;
+  }
+
+  @Override
   public FileContent content() {
     return content;
   }
diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java 
b/core/src/main/java/org/apache/iceberg/ManifestReader.java
index 939de8d..f791079 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java
@@ -189,10 +189,14 @@ public class ManifestReader<F extends ContentFile<F>>
     FileFormat format = FileFormat.fromFileName(file.location());
     Preconditions.checkArgument(format != null, "Unable to determine format of 
manifest: %s", file);
 
+    List<Types.NestedField> fields = Lists.newArrayList();
+    fields.addAll(projection.asStruct().fields());
+    fields.add(MetadataColumns.ROW_POSITION);
+
     switch (format) {
       case AVRO:
         AvroIterable<ManifestEntry<F>> reader = Avro.read(file)
-            .project(ManifestEntry.wrapFileSchema(projection.asStruct()))
+            .project(ManifestEntry.wrapFileSchema(Types.StructType.of(fields)))
             .rename("manifest_entry", GenericManifestEntry.class.getName())
             .rename("partition", PartitionData.class.getName())
             .rename("r102", PartitionData.class.getName())
diff --git a/core/src/main/java/org/apache/iceberg/V1Metadata.java 
b/core/src/main/java/org/apache/iceberg/V1Metadata.java
index cb6c922..4b11860 100644
--- a/core/src/main/java/org/apache/iceberg/V1Metadata.java
+++ b/core/src/main/java/org/apache/iceberg/V1Metadata.java
@@ -365,6 +365,11 @@ class V1Metadata {
     }
 
     @Override
+    public Long pos() {
+      return null;
+    }
+
+    @Override
     public int specId() {
       return wrapped.specId();
     }
diff --git a/core/src/main/java/org/apache/iceberg/V2Metadata.java 
b/core/src/main/java/org/apache/iceberg/V2Metadata.java
index 94ff1da..55a91c9 100644
--- a/core/src/main/java/org/apache/iceberg/V2Metadata.java
+++ b/core/src/main/java/org/apache/iceberg/V2Metadata.java
@@ -416,6 +416,11 @@ class V2Metadata {
     }
 
     @Override
+    public Long pos() {
+      return null;
+    }
+
+    @Override
     public int specId() {
       return wrapped.specId();
     }
diff --git a/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java 
b/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java
index c8a81ca..5957cae 100644
--- a/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java
+++ b/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.avro;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.function.Supplier;
 import org.apache.avro.LogicalType;
 import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
@@ -30,7 +31,7 @@ import org.apache.avro.io.Decoder;
 import org.apache.iceberg.common.DynClasses;
 import org.apache.iceberg.data.avro.DecoderResolver;
 
-class GenericAvroReader<T> implements DatumReader<T> {
+class GenericAvroReader<T> implements DatumReader<T>, SupportsRowPosition {
 
   private final Schema readSchema;
   private ClassLoader loader = Thread.currentThread().getContextClassLoader();
@@ -57,6 +58,13 @@ class GenericAvroReader<T> implements DatumReader<T> {
   }
 
   @Override
+  public void setRowPositionSupplier(Supplier<Long> posSupplier) {
+    if (reader instanceof SupportsRowPosition) {
+      ((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier);
+    }
+  }
+
+  @Override
   public T read(T reuse, Decoder decoder) throws IOException {
     return DecoderResolver.resolveAndRead(decoder, readSchema, fileSchema, 
reader, reuse);
   }
diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java 
b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
index e69c111..5e72c9b 100644
--- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
+++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
@@ -580,10 +580,19 @@ public class ValueReaders {
     private final Object[] constants;
     private int posField = -1;
 
-    protected StructReader(List<ValueReader<?>> readers) {
+    protected StructReader(List<ValueReader<?>> readers, Schema schema) {
       this.readers = readers.toArray(new ValueReader[0]);
       this.positions = new int[0];
       this.constants = new Object[0];
+
+      List<Schema.Field> fields = schema.getFields();
+      for (int pos = 0; pos < fields.size(); pos += 1) {
+        Schema.Field field = fields.get(pos);
+        if (AvroSchemaUtil.getFieldId(field) == 
MetadataColumns.ROW_POSITION.fieldId()) {
+          // track where the _pos field is located for setRowPositionSupplier
+          this.posField = pos;
+        }
+      }
     }
 
     protected StructReader(List<ValueReader<?>> readers, Types.StructType 
struct, Map<Integer, ?> idToConstant) {
@@ -609,7 +618,7 @@ public class ValueReaders {
 
     @Override
     public void setRowPositionSupplier(Supplier<Long> posSupplier) {
-      if (posField > 0) {
+      if (posField >= 0) {
         long startingPos = posSupplier.get();
         this.readers[posField] = new PositionReader(startingPos);
         for (ValueReader<?> reader : readers) {
@@ -667,7 +676,7 @@ public class ValueReaders {
     private final Schema recordSchema;
 
     private RecordReader(List<ValueReader<?>> readers, Schema recordSchema) {
-      super(readers);
+      super(readers, recordSchema);
       this.recordSchema = recordSchema;
     }
 
@@ -697,7 +706,7 @@ public class ValueReaders {
     private final Schema schema;
 
     IndexedRecordReader(List<ValueReader<?>> readers, Class<R> recordClass, 
Schema schema) {
-      super(readers);
+      super(readers, schema);
       this.recordClass = recordClass;
       this.ctor = DynConstructors.builder(IndexedRecord.class)
           .hiddenImpl(recordClass, Schema.class)
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReader.java 
b/core/src/test/java/org/apache/iceberg/TestManifestReader.java
index c1c2423..1163b39 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestReader.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestReader.java
@@ -120,4 +120,15 @@ public class TestManifestReader extends TableTestBase {
     }
   }
 
+  @Test
+  public void testDataFilePositions() throws IOException {
+    ManifestFile manifest = writeManifest(1000L, FILE_A, FILE_B, FILE_C);
+    try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, 
FILE_IO)) {
+      long expectedPos = 0L;
+      for (DataFile file : reader) {
+        Assert.assertEquals("Position should match", (Long) expectedPos, 
file.pos());
+        expectedPos += 1;
+      }
+    }
+  }
 }
diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java 
b/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
index 48dd001..14233c9 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
@@ -88,6 +88,11 @@ public class SparkDataFile implements DataFile {
   }
 
   @Override
+  public Long pos() {
+    return null;
+  }
+
+  @Override
   public int specId() {
     return -1;
   }

Reply via email to