This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 49a9f9946a Avro: Reading files using DataFileStream with ROW LINEAGE 
if the column isn't projected (#15508)
49a9f9946a is described below

commit 49a9f9946a04121dcf7a9cb9c3f5c68837301dee
Author: Ayush Saxena <[email protected]>
AuthorDate: Mon Mar 16 23:33:00 2026 +0530

    Avro: Reading files using DataFileStream with ROW LINEAGE if the column 
isn't projected (#15508)
---
 .../java/org/apache/iceberg/avro/ValueReaders.java |  7 ++-
 .../iceberg/data/avro/TestPlannedDataReader.java   | 64 ++++++++++++++++++++++
 2 files changed, 69 insertions(+), 2 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java 
b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
index 0ac98dabd4..ec46c56c72 100644
--- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
+++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
@@ -254,7 +254,10 @@ public class ValueReaders {
       Integer projectionPos,
       ValueReader<?> fieldReader,
       Map<Integer, ?> idToConstant) {
-    if (Objects.equals(fieldId, MetadataColumns.ROW_ID.fieldId())) {
+    if (projectionPos == null) {
+      // field is in the file but not projected; keep the reader only for 
skipping
+      return Pair.of(null, fieldReader);
+    } else if (Objects.equals(fieldId, MetadataColumns.ROW_ID.fieldId())) {
       Long firstRowId = (Long) idToConstant.get(fieldId);
       return Pair.of(projectionPos, ValueReaders.rowIds(firstRowId, 
fieldReader));
     } else if (Objects.equals(fieldId, 
MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId())) {
@@ -273,7 +276,7 @@ public class ValueReaders {
       ValueReader<?> fieldReader,
       Map<Integer, ?> idToConstant) {
     Object constant = idToConstant.get(fieldId);
-    if (projectionPos != null && constant != null) {
+    if (constant != null) {
       return Pair.of(projectionPos, 
ValueReaders.replaceWithConstant(fieldReader, constant));
     }
 
diff --git 
a/core/src/test/java/org/apache/iceberg/data/avro/TestPlannedDataReader.java 
b/core/src/test/java/org/apache/iceberg/data/avro/TestPlannedDataReader.java
index f9ff25d84b..a32a7548ae 100644
--- a/core/src/test/java/org/apache/iceberg/data/avro/TestPlannedDataReader.java
+++ b/core/src/test/java/org/apache/iceberg/data/avro/TestPlannedDataReader.java
@@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.time.LocalDateTime;
 import java.time.OffsetDateTime;
@@ -33,6 +34,7 @@ import java.util.Map;
 import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
+import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
@@ -40,7 +42,10 @@ import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.EncoderFactory;
+import org.apache.iceberg.Files;
 import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.avro.AvroIterable;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -305,6 +310,65 @@ public class TestPlannedDataReader {
         .isEqualTo(10L);
   }
 
+  @Test
+  public void testLineageColumnsNotProjected() throws Exception {
+
+    org.apache.iceberg.Schema icebergSchema =
+        new org.apache.iceberg.Schema(
+            Types.NestedField.required(1, "data", Types.StringType.get()));
+
+    Schema fileSchema =
+        SchemaBuilder.record("test")
+            .fields()
+            .name("data")
+            .type()
+            .stringType()
+            .noDefault()
+            .name(MetadataColumns.ROW_ID.name())
+            .type()
+            .optional()
+            .longType()
+            .name(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name())
+            .type()
+            .optional()
+            .longType()
+            .endRecord();
+
+    fileSchema.getField("data").addProp("field-id", 1);
+    fileSchema
+        .getField(MetadataColumns.ROW_ID.name())
+        .addProp("field-id", MetadataColumns.ROW_ID.fieldId());
+    fileSchema
+        .getField(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name())
+        .addProp("field-id", 
MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId());
+
+    File file = File.createTempFile("test", ".avro");
+
+    try (DataFileWriter<GenericRecord> writer =
+        new DataFileWriter<>(new GenericDatumWriter<>(fileSchema))) {
+
+      writer.create(fileSchema, file);
+
+      GenericRecord rec = new GenericData.Record(fileSchema);
+      rec.put("data", "a");
+      rec.put(MetadataColumns.ROW_ID.name(), 10L);
+      rec.put(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name(), 5L);
+
+      writer.append(rec);
+    }
+
+    try (AvroIterable<Record> reader =
+        Avro.read(Files.localInput(file))
+            .createResolvingReader(schema -> 
PlannedDataReader.create(icebergSchema))
+            .project(icebergSchema)
+            .build()) {
+
+      List<Record> rows = Lists.newArrayList(reader);
+      assertThat(rows).hasSize(1);
+      assertThat(rows.get(0).getField("data")).isEqualTo("a");
+    }
+  }
+
   private Record readRecord(
       PlannedDataReader<Record> reader, Schema avroSchema, GenericRecord 
avroRecord)
       throws IOException {

Reply via email to