This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 232af57377 Core: Avro: Row Lineage Column (ROW_ID) is not populated
correctly in case the Data File doesn't have them (#15187)
232af57377 is described below
commit 232af57377bcd796c7d69d57854b39f996dfd9d0
Author: Ayush Saxena <[email protected]>
AuthorDate: Fri Feb 6 22:46:08 2026 +0530
Core: Avro: Row Lineage Column (ROW_ID) is not populated correctly in case
the Data File doesn't have them (#15187)
---
.../java/org/apache/iceberg/avro/ValueReaders.java | 3 +
.../iceberg/data/avro/TestPlannedDataReader.java | 139 ++++++++++++++++++++-
2 files changed, 140 insertions(+), 2 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
index 3ac755bb4a..0ac98dabd4 100644
--- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
+++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
@@ -305,6 +305,9 @@ public class ValueReaders {
Types.NestedField field = expected.field(fieldId);
if (constant != null) {
+ if (fieldId == MetadataColumns.ROW_ID.fieldId()) {
+ return ValueReaders.rowIds((Long) constant, null);
+ }
return ValueReaders.constant(constant);
} else if (field.initialDefault() != null) {
return ValueReaders.constant(convert.apply(field.type(),
field.initialDefault()));
diff --git
a/core/src/test/java/org/apache/iceberg/data/avro/TestPlannedDataReader.java
b/core/src/test/java/org/apache/iceberg/data/avro/TestPlannedDataReader.java
index 438a9202cc..f9ff25d84b 100644
--- a/core/src/test/java/org/apache/iceberg/data/avro/TestPlannedDataReader.java
+++ b/core/src/test/java/org/apache/iceberg/data/avro/TestPlannedDataReader.java
@@ -27,16 +27,24 @@ import java.io.IOException;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
+import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.data.Record;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;
import org.junit.jupiter.api.Test;
@@ -186,19 +194,146 @@ public class TestPlannedDataReader {
.isEqualTo(preEpochTimestampMillis.withOffsetSameInstant(ZoneOffset.UTC));
}
+ @Test
+ public void testRowLineageInjectedWithPlannedReader() throws IOException {
+ org.apache.iceberg.Schema icebergSchema =
+ new org.apache.iceberg.Schema(
+ Types.NestedField.required(1, "data", Types.StringType.get()),
+ MetadataColumns.ROW_ID,
+ MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER);
+
+ Schema avroFileSchema =
+ SchemaBuilder.record("test")
+ .fields()
+ .name("data")
+ .type()
+ .stringType()
+ .noDefault()
+ .endRecord();
+ avroFileSchema.getField("data").addProp("field-id", 1);
+
+ Map<Integer, Object> idToConstant =
+ ImmutableMap.of(
+ MetadataColumns.ROW_ID.fieldId(), 1000L,
+ MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(), 5L);
+
+ PlannedDataReader<Record> reader = PlannedDataReader.create(icebergSchema,
idToConstant);
+ reader.setSchema(avroFileSchema);
+ reader.setRowPositionSupplier(() -> 0L);
+
+ GenericRecord rec1 = new GenericData.Record(avroFileSchema);
+ rec1.put("data", "a");
+ GenericRecord rec2 = new GenericData.Record(avroFileSchema);
+ rec2.put("data", "b");
+
+ List<Record> results = readRecords(reader, avroFileSchema,
Lists.newArrayList(rec1, rec2));
+
+ assertThat(results).hasSize(2);
+
+
assertThat(results.get(0).getField(MetadataColumns.ROW_ID.name())).isEqualTo(1000L);
+
assertThat(results.get(0).getField(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name()))
+ .isEqualTo(5L);
+
assertThat(results.get(1).getField(MetadataColumns.ROW_ID.name())).isEqualTo(1001L);
+
assertThat(results.get(1).getField(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name()))
+ .isEqualTo(5L);
+ }
+
+ @Test
+ public void testMixedRowLineageValues() throws IOException {
+ org.apache.iceberg.Schema icebergSchema =
+ new org.apache.iceberg.Schema(
+ Types.NestedField.required(1, "data", Types.StringType.get()),
+ MetadataColumns.ROW_ID,
+ MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER);
+
+ Schema avroFileSchema =
+ SchemaBuilder.record("test")
+ .fields()
+ .name("data")
+ .type()
+ .stringType()
+ .noDefault()
+ .name(MetadataColumns.ROW_ID.name())
+ .type()
+ .optional()
+ .longType()
+ .name(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name())
+ .type()
+ .optional()
+ .longType()
+ .endRecord();
+
+ avroFileSchema.getField("data").addProp("field-id", 1);
+ avroFileSchema
+ .getField(MetadataColumns.ROW_ID.name())
+ .addProp("field-id", MetadataColumns.ROW_ID.fieldId());
+ avroFileSchema
+ .getField(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name())
+ .addProp("field-id",
MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId());
+
+ Map<Integer, Object> idToConstant =
+ ImmutableMap.of(
+ MetadataColumns.ROW_ID.fieldId(), 1000L,
+ MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(), 10L);
+
+ PlannedDataReader<Record> reader = PlannedDataReader.create(icebergSchema,
idToConstant);
+ reader.setSchema(avroFileSchema);
+ reader.setRowPositionSupplier(() -> 0L);
+
+ GenericRecord recWithRowLineageValues = new
GenericData.Record(avroFileSchema);
+ recWithRowLineageValues.put("data", "has_values");
+ recWithRowLineageValues.put(MetadataColumns.ROW_ID.name(), 555L);
+
recWithRowLineageValues.put(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name(),
7L);
+
+ GenericRecord recWithoutRowLineageValues = new
GenericData.Record(avroFileSchema);
+ recWithoutRowLineageValues.put("data", "has_nulls");
+ recWithoutRowLineageValues.put(MetadataColumns.ROW_ID.name(), null);
+
recWithoutRowLineageValues.put(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name(),
null);
+
+ List<Record> results =
+ readRecords(
+ reader,
+ avroFileSchema,
+ Lists.newArrayList(recWithRowLineageValues,
recWithoutRowLineageValues));
+
+
assertThat(results.get(0).getField(MetadataColumns.ROW_ID.name())).isEqualTo(555L);
+
assertThat(results.get(0).getField(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name()))
+ .isEqualTo(7L);
+
+
assertThat(results.get(1).getField(MetadataColumns.ROW_ID.name())).isEqualTo(1001L);
+
assertThat(results.get(1).getField(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name()))
+ .isEqualTo(10L);
+ }
+
private Record readRecord(
PlannedDataReader<Record> reader, Schema avroSchema, GenericRecord
avroRecord)
throws IOException {
+ return Iterables.getOnlyElement(
+ readRecords(reader, avroSchema,
Collections.singletonList(avroRecord)));
+ }
+
+ private List<Record> readRecords(
+ PlannedDataReader<Record> reader, Schema avroSchema, List<GenericRecord>
avroRecords)
+ throws IOException {
+ List<Record> results = Lists.newArrayList();
+
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
GenericDatumWriter<GenericRecord> writer = new
GenericDatumWriter<>(avroSchema);
- writer.write(avroRecord, encoder);
+
+ for (GenericRecord rec : avroRecords) {
+ writer.write(rec, encoder);
+ }
encoder.flush();
try (ByteArrayInputStream in = new
ByteArrayInputStream(out.toByteArray())) {
- return reader.read(null, DecoderFactory.get().binaryDecoder(in, null));
+ BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(in, null);
+ for (int i = 0; i < avroRecords.size(); i++) {
+ results.add(reader.read(null, decoder));
+ }
}
}
+ return results;
}
private Schema utcAdjustedLongSchema() {