This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 49a9f9946a Avro: Reading files using DataFileStream with ROW LINEAGE
if the column isn't projected (#15508)
49a9f9946a is described below
commit 49a9f9946a04121dcf7a9cb9c3f5c68837301dee
Author: Ayush Saxena <[email protected]>
AuthorDate: Mon Mar 16 23:33:00 2026 +0530
Avro: Reading files using DataFileStream with ROW LINEAGE if the column
isn't projected (#15508)
---
.../java/org/apache/iceberg/avro/ValueReaders.java | 7 ++-
.../iceberg/data/avro/TestPlannedDataReader.java | 64 ++++++++++++++++++++++
2 files changed, 69 insertions(+), 2 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
index 0ac98dabd4..ec46c56c72 100644
--- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
+++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
@@ -254,7 +254,10 @@ public class ValueReaders {
Integer projectionPos,
ValueReader<?> fieldReader,
Map<Integer, ?> idToConstant) {
- if (Objects.equals(fieldId, MetadataColumns.ROW_ID.fieldId())) {
+ if (projectionPos == null) {
+ // field is in the file but not projected; keep the reader only for
skipping
+ return Pair.of(null, fieldReader);
+ } else if (Objects.equals(fieldId, MetadataColumns.ROW_ID.fieldId())) {
Long firstRowId = (Long) idToConstant.get(fieldId);
return Pair.of(projectionPos, ValueReaders.rowIds(firstRowId,
fieldReader));
} else if (Objects.equals(fieldId,
MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId())) {
@@ -273,7 +276,7 @@ public class ValueReaders {
ValueReader<?> fieldReader,
Map<Integer, ?> idToConstant) {
Object constant = idToConstant.get(fieldId);
- if (projectionPos != null && constant != null) {
+ if (constant != null) {
return Pair.of(projectionPos,
ValueReaders.replaceWithConstant(fieldReader, constant));
}
diff --git
a/core/src/test/java/org/apache/iceberg/data/avro/TestPlannedDataReader.java
b/core/src/test/java/org/apache/iceberg/data/avro/TestPlannedDataReader.java
index f9ff25d84b..a32a7548ae 100644
--- a/core/src/test/java/org/apache/iceberg/data/avro/TestPlannedDataReader.java
+++ b/core/src/test/java/org/apache/iceberg/data/avro/TestPlannedDataReader.java
@@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.File;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
@@ -33,6 +34,7 @@ import java.util.Map;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
+import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
@@ -40,7 +42,10 @@ import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
+import org.apache.iceberg.Files;
import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.avro.AvroIterable;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -305,6 +310,65 @@ public class TestPlannedDataReader {
.isEqualTo(10L);
}
+ @Test
+ public void testLineageColumnsNotProjected() throws Exception {
+
+ org.apache.iceberg.Schema icebergSchema =
+ new org.apache.iceberg.Schema(
+ Types.NestedField.required(1, "data", Types.StringType.get()));
+
+ Schema fileSchema =
+ SchemaBuilder.record("test")
+ .fields()
+ .name("data")
+ .type()
+ .stringType()
+ .noDefault()
+ .name(MetadataColumns.ROW_ID.name())
+ .type()
+ .optional()
+ .longType()
+ .name(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name())
+ .type()
+ .optional()
+ .longType()
+ .endRecord();
+
+ fileSchema.getField("data").addProp("field-id", 1);
+ fileSchema
+ .getField(MetadataColumns.ROW_ID.name())
+ .addProp("field-id", MetadataColumns.ROW_ID.fieldId());
+ fileSchema
+ .getField(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name())
+ .addProp("field-id",
MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId());
+
+ File file = File.createTempFile("test", ".avro");
+
+ try (DataFileWriter<GenericRecord> writer =
+ new DataFileWriter<>(new GenericDatumWriter<>(fileSchema))) {
+
+ writer.create(fileSchema, file);
+
+ GenericRecord rec = new GenericData.Record(fileSchema);
+ rec.put("data", "a");
+ rec.put(MetadataColumns.ROW_ID.name(), 10L);
+ rec.put(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name(), 5L);
+
+ writer.append(rec);
+ }
+
+ try (AvroIterable<Record> reader =
+ Avro.read(Files.localInput(file))
+ .createResolvingReader(schema ->
PlannedDataReader.create(icebergSchema))
+ .project(icebergSchema)
+ .build()) {
+
+ List<Record> rows = Lists.newArrayList(reader);
+ assertThat(rows).hasSize(1);
+ assertThat(rows.get(0).getField("data")).isEqualTo("a");
+ }
+ }
+
private Record readRecord(
PlannedDataReader<Record> reader, Schema avroSchema, GenericRecord
avroRecord)
throws IOException {