This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 232af57377 Core: Avro: Row Lineage Column (ROW_ID) is not populated 
correctly in case the Data File doesn't have them (#15187)
232af57377 is described below

commit 232af57377bcd796c7d69d57854b39f996dfd9d0
Author: Ayush Saxena <[email protected]>
AuthorDate: Fri Feb 6 22:46:08 2026 +0530

    Core: Avro: Row Lineage Column (ROW_ID) is not populated correctly in case 
the Data File doesn't have them (#15187)
---
 .../java/org/apache/iceberg/avro/ValueReaders.java |   3 +
 .../iceberg/data/avro/TestPlannedDataReader.java   | 139 ++++++++++++++++++++-
 2 files changed, 140 insertions(+), 2 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java 
b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
index 3ac755bb4a..0ac98dabd4 100644
--- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
+++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
@@ -305,6 +305,9 @@ public class ValueReaders {
     Types.NestedField field = expected.field(fieldId);
 
     if (constant != null) {
+      if (fieldId == MetadataColumns.ROW_ID.fieldId()) {
+        return ValueReaders.rowIds((Long) constant, null);
+      }
       return ValueReaders.constant(constant);
     } else if (field.initialDefault() != null) {
       return ValueReaders.constant(convert.apply(field.type(), 
field.initialDefault()));
diff --git 
a/core/src/test/java/org/apache/iceberg/data/avro/TestPlannedDataReader.java 
b/core/src/test/java/org/apache/iceberg/data/avro/TestPlannedDataReader.java
index 438a9202cc..f9ff25d84b 100644
--- a/core/src/test/java/org/apache/iceberg/data/avro/TestPlannedDataReader.java
+++ b/core/src/test/java/org/apache/iceberg/data/avro/TestPlannedDataReader.java
@@ -27,16 +27,24 @@ import java.io.IOException;
 import java.time.LocalDateTime;
 import java.time.OffsetDateTime;
 import java.time.ZoneOffset;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.EncoderFactory;
+import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.data.Record;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.DateTimeUtil;
 import org.junit.jupiter.api.Test;
@@ -186,19 +194,146 @@ public class TestPlannedDataReader {
         
.isEqualTo(preEpochTimestampMillis.withOffsetSameInstant(ZoneOffset.UTC));
   }
 
+  @Test
+  public void testRowLineageInjectedWithPlannedReader() throws IOException {
+    org.apache.iceberg.Schema icebergSchema =
+        new org.apache.iceberg.Schema(
+            Types.NestedField.required(1, "data", Types.StringType.get()),
+            MetadataColumns.ROW_ID,
+            MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER);
+
+    Schema avroFileSchema =
+        SchemaBuilder.record("test")
+            .fields()
+            .name("data")
+            .type()
+            .stringType()
+            .noDefault()
+            .endRecord();
+    avroFileSchema.getField("data").addProp("field-id", 1);
+
+    Map<Integer, Object> idToConstant =
+        ImmutableMap.of(
+            MetadataColumns.ROW_ID.fieldId(), 1000L,
+            MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(), 5L);
+
+    PlannedDataReader<Record> reader = PlannedDataReader.create(icebergSchema, 
idToConstant);
+    reader.setSchema(avroFileSchema);
+    reader.setRowPositionSupplier(() -> 0L);
+
+    GenericRecord rec1 = new GenericData.Record(avroFileSchema);
+    rec1.put("data", "a");
+    GenericRecord rec2 = new GenericData.Record(avroFileSchema);
+    rec2.put("data", "b");
+
+    List<Record> results = readRecords(reader, avroFileSchema, 
Lists.newArrayList(rec1, rec2));
+
+    assertThat(results).hasSize(2);
+
+    
assertThat(results.get(0).getField(MetadataColumns.ROW_ID.name())).isEqualTo(1000L);
+    
assertThat(results.get(0).getField(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name()))
+        .isEqualTo(5L);
+    
assertThat(results.get(1).getField(MetadataColumns.ROW_ID.name())).isEqualTo(1001L);
+    
assertThat(results.get(1).getField(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name()))
+        .isEqualTo(5L);
+  }
+
+  @Test
+  public void testMixedRowLineageValues() throws IOException {
+    org.apache.iceberg.Schema icebergSchema =
+        new org.apache.iceberg.Schema(
+            Types.NestedField.required(1, "data", Types.StringType.get()),
+            MetadataColumns.ROW_ID,
+            MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER);
+
+    Schema avroFileSchema =
+        SchemaBuilder.record("test")
+            .fields()
+            .name("data")
+            .type()
+            .stringType()
+            .noDefault()
+            .name(MetadataColumns.ROW_ID.name())
+            .type()
+            .optional()
+            .longType()
+            .name(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name())
+            .type()
+            .optional()
+            .longType()
+            .endRecord();
+
+    avroFileSchema.getField("data").addProp("field-id", 1);
+    avroFileSchema
+        .getField(MetadataColumns.ROW_ID.name())
+        .addProp("field-id", MetadataColumns.ROW_ID.fieldId());
+    avroFileSchema
+        .getField(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name())
+        .addProp("field-id", 
MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId());
+
+    Map<Integer, Object> idToConstant =
+        ImmutableMap.of(
+            MetadataColumns.ROW_ID.fieldId(), 1000L,
+            MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(), 10L);
+
+    PlannedDataReader<Record> reader = PlannedDataReader.create(icebergSchema, 
idToConstant);
+    reader.setSchema(avroFileSchema);
+    reader.setRowPositionSupplier(() -> 0L);
+
+    GenericRecord recWithRowLineageValues = new 
GenericData.Record(avroFileSchema);
+    recWithRowLineageValues.put("data", "has_values");
+    recWithRowLineageValues.put(MetadataColumns.ROW_ID.name(), 555L);
+    
recWithRowLineageValues.put(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name(),
 7L);
+
+    GenericRecord recWithoutRowLineageValues = new 
GenericData.Record(avroFileSchema);
+    recWithoutRowLineageValues.put("data", "has_nulls");
+    recWithoutRowLineageValues.put(MetadataColumns.ROW_ID.name(), null);
+    
recWithoutRowLineageValues.put(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name(),
 null);
+
+    List<Record> results =
+        readRecords(
+            reader,
+            avroFileSchema,
+            Lists.newArrayList(recWithRowLineageValues, 
recWithoutRowLineageValues));
+
+    
assertThat(results.get(0).getField(MetadataColumns.ROW_ID.name())).isEqualTo(555L);
+    
assertThat(results.get(0).getField(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name()))
+        .isEqualTo(7L);
+
+    
assertThat(results.get(1).getField(MetadataColumns.ROW_ID.name())).isEqualTo(1001L);
+    
assertThat(results.get(1).getField(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name()))
+        .isEqualTo(10L);
+  }
+
   private Record readRecord(
       PlannedDataReader<Record> reader, Schema avroSchema, GenericRecord 
avroRecord)
       throws IOException {
+    return Iterables.getOnlyElement(
+        readRecords(reader, avroSchema, 
Collections.singletonList(avroRecord)));
+  }
+
+  private List<Record> readRecords(
+      PlannedDataReader<Record> reader, Schema avroSchema, List<GenericRecord> 
avroRecords)
+      throws IOException {
+    List<Record> results = Lists.newArrayList();
+
     try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
       BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
       GenericDatumWriter<GenericRecord> writer = new 
GenericDatumWriter<>(avroSchema);
-      writer.write(avroRecord, encoder);
+
+      for (GenericRecord rec : avroRecords) {
+        writer.write(rec, encoder);
+      }
       encoder.flush();
 
       try (ByteArrayInputStream in = new 
ByteArrayInputStream(out.toByteArray())) {
-        return reader.read(null, DecoderFactory.get().binaryDecoder(in, null));
+        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(in, null);
+        for (int i = 0; i < avroRecords.size(); i++) {
+          results.add(reader.read(null, decoder));
+        }
       }
     }
+    return results;
   }
 
   private Schema utcAdjustedLongSchema() {

Reply via email to