This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new f0e4fd2f55 Core: Add internal Avro reader (#11108)
f0e4fd2f55 is described below

commit f0e4fd2f557529eaa87c78d8d6585105e40b1f10
Author: Ryan Blue <[email protected]>
AuthorDate: Mon Oct 7 16:12:39 2024 -0700

    Core: Add internal Avro reader (#11108)
---
 .../java/org/apache/iceberg/ManifestReader.java    |  26 ++-
 .../org/apache/iceberg/avro/GenericAvroReader.java |  64 +-----
 .../org/apache/iceberg/avro/InternalReader.java    | 252 +++++++++++++++++++++
 .../org/apache/iceberg/avro/InternalReaders.java   | 110 +++++++++
 .../java/org/apache/iceberg/avro/ValueReaders.java |  98 ++++++++
 .../org/apache/iceberg/TestManifestReader.java     |   3 +-
 6 files changed, 483 insertions(+), 70 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java 
b/core/src/main/java/org/apache/iceberg/ManifestReader.java
index 6364603c59..cf04eb7c47 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java
@@ -25,8 +25,10 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.avro.io.DatumReader;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.avro.AvroIterable;
+import org.apache.iceberg.avro.InternalReader;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.expressions.Evaluator;
 import org.apache.iceberg.expressions.Expression;
@@ -65,16 +67,16 @@ public class ManifestReader<F extends ContentFile<F>> 
extends CloseableGroup
           "record_count");
 
   protected enum FileType {
-    DATA_FILES(GenericDataFile.class.getName()),
-    DELETE_FILES(GenericDeleteFile.class.getName());
+    DATA_FILES(GenericDataFile.class),
+    DELETE_FILES(GenericDeleteFile.class);
 
-    private final String fileClass;
+    private final Class<? extends StructLike> fileClass;
 
-    FileType(String fileClass) {
+    FileType(Class<? extends StructLike> fileClass) {
       this.fileClass = fileClass;
     }
 
-    private String fileClass() {
+    private Class<? extends StructLike> fileClass() {
       return fileClass;
     }
   }
@@ -261,12 +263,7 @@ public class ManifestReader<F extends ContentFile<F>> 
extends CloseableGroup
         AvroIterable<ManifestEntry<F>> reader =
             Avro.read(file)
                 
.project(ManifestEntry.wrapFileSchema(Types.StructType.of(fields)))
-                .rename("manifest_entry", GenericManifestEntry.class.getName())
-                .rename("partition", PartitionData.class.getName())
-                .rename("r102", PartitionData.class.getName())
-                .rename("data_file", content.fileClass())
-                .rename("r2", content.fileClass())
-                .classLoader(GenericManifestEntry.class.getClassLoader())
+                .createResolvingReader(this::newReader)
                 .reuseContainers()
                 .build();
 
@@ -279,6 +276,13 @@ public class ManifestReader<F extends ContentFile<F>> 
extends CloseableGroup
     }
   }
 
+  private DatumReader<?> newReader(Schema schema) {
+    return InternalReader.create(schema)
+        .setRootType(GenericManifestEntry.class)
+        .setCustomType(ManifestEntry.DATA_FILE_ID, content.fileClass())
+        .setCustomType(DataFile.PARTITION_ID, PartitionData.class);
+  }
+
   CloseableIterable<ManifestEntry<F>> liveEntries() {
     return entries(true /* only live entries */);
   }
diff --git a/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java 
b/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java
index f630129dc5..bfdb65acf1 100644
--- a/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java
+++ b/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java
@@ -28,11 +28,8 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
-import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.common.DynClasses;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.Pair;
@@ -43,7 +40,7 @@ public class GenericAvroReader<T>
   private final Types.StructType expectedType;
   private ClassLoader loader = Thread.currentThread().getContextClassLoader();
   private Map<String, String> renames = ImmutableMap.of();
-  private final Map<Integer, ?> idToConstant = ImmutableMap.of();
+  private final Map<Integer, Object> idToConstant = ImmutableMap.of();
   private Schema fileSchema = null;
   private ValueReader<T> reader = null;
 
@@ -111,48 +108,13 @@ public class GenericAvroReader<T>
 
     @Override
     public ValueReader<?> record(Type partner, Schema record, 
List<ValueReader<?>> fieldResults) {
-      Types.StructType expected = partner != null ? partner.asStructType() : 
null;
-      Map<Integer, Integer> idToPos = idToPos(expected);
-
-      List<Pair<Integer, ValueReader<?>>> readPlan = Lists.newArrayList();
-      List<Schema.Field> fileFields = record.getFields();
-      for (int pos = 0; pos < fileFields.size(); pos += 1) {
-        Schema.Field field = fileFields.get(pos);
-        ValueReader<?> fieldReader = fieldResults.get(pos);
-        Integer fieldId = AvroSchemaUtil.fieldId(field);
-        Integer projectionPos = idToPos.remove(fieldId);
-
-        Object constant = idToConstant.get(fieldId);
-        if (projectionPos != null && constant != null) {
-          readPlan.add(
-              Pair.of(projectionPos, 
ValueReaders.replaceWithConstant(fieldReader, constant)));
-        } else {
-          readPlan.add(Pair.of(projectionPos, fieldReader));
-        }
+      if (partner == null) {
+        return ValueReaders.skipStruct(fieldResults);
       }
 
-      // handle any expected columns that are not in the data file
-      for (Map.Entry<Integer, Integer> idAndPos : idToPos.entrySet()) {
-        int fieldId = idAndPos.getKey();
-        int pos = idAndPos.getValue();
-
-        Object constant = idToConstant.get(fieldId);
-        Types.NestedField field = expected.field(fieldId);
-        if (constant != null) {
-          readPlan.add(Pair.of(pos, ValueReaders.constant(constant)));
-        } else if (field.initialDefault() != null) {
-          readPlan.add(Pair.of(pos, 
ValueReaders.constant(field.initialDefault())));
-        } else if (fieldId == MetadataColumns.IS_DELETED.fieldId()) {
-          readPlan.add(Pair.of(pos, ValueReaders.constant(false)));
-        } else if (fieldId == MetadataColumns.ROW_POSITION.fieldId()) {
-          readPlan.add(Pair.of(pos, ValueReaders.positions()));
-        } else if (field.isOptional()) {
-          readPlan.add(Pair.of(pos, ValueReaders.constant(null)));
-        } else {
-          throw new IllegalArgumentException(
-              String.format("Missing required field: %s", field.name()));
-        }
-      }
+      Types.StructType expected = partner.asStructType();
+      List<Pair<Integer, ValueReader<?>>> readPlan =
+          ValueReaders.buildReadPlan(expected, record, fieldResults, 
idToConstant);
 
       return recordReader(readPlan, avroSchemas.get(partner), 
record.getFullName());
     }
@@ -266,19 +228,5 @@ public class GenericAvroReader<T>
           throw new IllegalArgumentException("Unsupported type: " + primitive);
       }
     }
-
-    private Map<Integer, Integer> idToPos(Types.StructType struct) {
-      Map<Integer, Integer> idToPos = Maps.newHashMap();
-
-      if (struct != null) {
-        List<Types.NestedField> fields = struct.fields();
-        for (int pos = 0; pos < fields.size(); pos += 1) {
-          Types.NestedField field = fields.get(pos);
-          idToPos.put(field.fieldId(), pos);
-        }
-      }
-
-      return idToPos;
-    }
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/avro/InternalReader.java 
b/core/src/main/java/org/apache/iceberg/avro/InternalReader.java
new file mode 100644
index 0000000000..ca83ce2ba7
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/avro/InternalReader.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.avro;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
+
+/**
+ * A reader that produces Iceberg's internal in-memory object model.
+ *
+ * <p>Iceberg's internal in-memory object model produces the types defined in 
{@link
+ * Type.TypeID#javaClass()}.
+ *
+ * @param <T> Java type returned by the reader
+ */
+public class InternalReader<T> implements DatumReader<T>, SupportsRowPosition {
+  private static final int ROOT_ID = -1;
+
+  private final Types.StructType expectedType;
+  private final Map<Integer, Class<? extends StructLike>> typeMap = 
Maps.newHashMap();
+  private final Map<Integer, Object> idToConstant = ImmutableMap.of();
+  private Schema fileSchema = null;
+  private ValueReader<T> reader = null;
+
+  public static <D> InternalReader<D> create(org.apache.iceberg.Schema schema) 
{
+    return new InternalReader<>(schema);
+  }
+
+  InternalReader(org.apache.iceberg.Schema readSchema) {
+    this.expectedType = readSchema.asStruct();
+  }
+
+  @SuppressWarnings("unchecked")
+  private void initReader() {
+    this.reader =
+        (ValueReader<T>)
+            AvroWithPartnerVisitor.visit(
+                Pair.of(ROOT_ID, expectedType),
+                fileSchema,
+                new ResolvingReadBuilder(),
+                AccessByID.instance());
+  }
+
+  @Override
+  public void setSchema(Schema schema) {
+    this.fileSchema = schema;
+    initReader();
+  }
+
+  public InternalReader<T> setRootType(Class<? extends StructLike> rootClass) {
+    typeMap.put(ROOT_ID, rootClass);
+    return this;
+  }
+
+  public InternalReader<T> setCustomType(int fieldId, Class<? extends 
StructLike> structClass) {
+    typeMap.put(fieldId, structClass);
+    return this;
+  }
+
+  @Override
+  public void setRowPositionSupplier(Supplier<Long> posSupplier) {
+    if (reader instanceof SupportsRowPosition) {
+      ((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier);
+    }
+  }
+
+  @Override
+  public T read(T reuse, Decoder decoder) throws IOException {
+    return reader.read(decoder, reuse);
+  }
+
+  private class ResolvingReadBuilder
+      extends AvroWithPartnerVisitor<Pair<Integer, Type>, ValueReader<?>> {
+    @Override
+    public ValueReader<?> record(
+        Pair<Integer, Type> partner, Schema record, List<ValueReader<?>> 
fieldResults) {
+      if (partner == null) {
+        return ValueReaders.skipStruct(fieldResults);
+      }
+
+      Types.StructType expected = partner.second().asStructType();
+      List<Pair<Integer, ValueReader<?>>> readPlan =
+          ValueReaders.buildReadPlan(expected, record, fieldResults, 
idToConstant);
+
+      return structReader(readPlan, partner.first(), expected);
+    }
+
+    private ValueReader<?> structReader(
+        List<Pair<Integer, ValueReader<?>>> readPlan, int fieldId, 
Types.StructType struct) {
+
+      Class<? extends StructLike> structClass = typeMap.get(fieldId);
+      if (structClass != null) {
+        return InternalReaders.struct(struct, structClass, readPlan);
+      } else {
+        return InternalReaders.struct(struct, readPlan);
+      }
+    }
+
+    @Override
+    public ValueReader<?> union(
+        Pair<Integer, Type> partner, Schema union, List<ValueReader<?>> 
options) {
+      return ValueReaders.union(options);
+    }
+
+    @Override
+    public ValueReader<?> arrayMap(
+        Pair<Integer, Type> partner,
+        Schema map,
+        ValueReader<?> keyReader,
+        ValueReader<?> valueReader) {
+      return ValueReaders.arrayMap(keyReader, valueReader);
+    }
+
+    @Override
+    public ValueReader<?> array(
+        Pair<Integer, Type> partner, Schema array, ValueReader<?> 
elementReader) {
+      return ValueReaders.array(elementReader);
+    }
+
+    @Override
+    public ValueReader<?> map(Pair<Integer, Type> partner, Schema map, 
ValueReader<?> valueReader) {
+      return ValueReaders.map(ValueReaders.strings(), valueReader);
+    }
+
+    @Override
+    public ValueReader<?> primitive(Pair<Integer, Type> partner, Schema 
primitive) {
+      LogicalType logicalType = primitive.getLogicalType();
+      if (logicalType != null) {
+        switch (logicalType.getName()) {
+          case "date":
+            return ValueReaders.ints();
+
+          case "time-micros":
+            return ValueReaders.longs();
+
+          case "timestamp-millis":
+            // adjust to microseconds
+            ValueReader<Long> longs = ValueReaders.longs();
+            return (ValueReader<Long>) (decoder, ignored) -> 
longs.read(decoder, null) * 1000L;
+
+          case "timestamp-micros":
+            return ValueReaders.longs();
+
+          case "decimal":
+            return ValueReaders.decimal(
+                ValueReaders.decimalBytesReader(primitive),
+                ((LogicalTypes.Decimal) logicalType).getScale());
+
+          case "uuid":
+            return ValueReaders.uuids();
+
+          default:
+            throw new IllegalArgumentException("Unknown logical type: " + 
logicalType);
+        }
+      }
+
+      switch (primitive.getType()) {
+        case NULL:
+          return ValueReaders.nulls();
+        case BOOLEAN:
+          return ValueReaders.booleans();
+        case INT:
+          if (partner != null && partner.second().typeId() == 
Type.TypeID.LONG) {
+            return ValueReaders.intsAsLongs();
+          }
+          return ValueReaders.ints();
+        case LONG:
+          return ValueReaders.longs();
+        case FLOAT:
+          if (partner != null && partner.second().typeId() == 
Type.TypeID.DOUBLE) {
+            return ValueReaders.floatsAsDoubles();
+          }
+          return ValueReaders.floats();
+        case DOUBLE:
+          return ValueReaders.doubles();
+        case STRING:
+          return ValueReaders.strings();
+        case FIXED:
+          return ValueReaders.fixed(primitive);
+        case BYTES:
+          return ValueReaders.byteBuffers();
+        case ENUM:
+          return ValueReaders.enums(primitive.getEnumSymbols());
+        default:
+          throw new IllegalArgumentException("Unsupported type: " + primitive);
+      }
+    }
+  }
+
+  private static class AccessByID
+      implements AvroWithPartnerVisitor.PartnerAccessors<Pair<Integer, Type>> {
+    private static final AccessByID INSTANCE = new AccessByID();
+
+    public static AccessByID instance() {
+      return INSTANCE;
+    }
+
+    @Override
+    public Pair<Integer, Type> fieldPartner(
+        Pair<Integer, Type> partner, Integer fieldId, String name) {
+      Types.NestedField field = partner.second().asStructType().field(fieldId);
+      return field != null ? Pair.of(field.fieldId(), field.type()) : null;
+    }
+
+    @Override
+    public Pair<Integer, Type> mapKeyPartner(Pair<Integer, Type> partner) {
+      Types.MapType map = partner.second().asMapType();
+      return Pair.of(map.keyId(), map.keyType());
+    }
+
+    @Override
+    public Pair<Integer, Type> mapValuePartner(Pair<Integer, Type> partner) {
+      Types.MapType map = partner.second().asMapType();
+      return Pair.of(map.valueId(), map.valueType());
+    }
+
+    @Override
+    public Pair<Integer, Type> listElementPartner(Pair<Integer, Type> partner) 
{
+      Types.ListType list = partner.second().asListType();
+      return Pair.of(list.elementId(), list.elementType());
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/avro/InternalReaders.java 
b/core/src/main/java/org/apache/iceberg/avro/InternalReaders.java
new file mode 100644
index 0000000000..6136bae052
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/avro/InternalReaders.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.avro;
+
+import java.util.List;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.common.DynConstructors;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
+
+class InternalReaders {
+  private InternalReaders() {}
+
+  static ValueReader<? extends Record> struct(
+      Types.StructType struct, List<Pair<Integer, ValueReader<?>>> readPlan) {
+    return new RecordReader(readPlan, struct);
+  }
+
+  static <S extends StructLike> ValueReader<S> struct(
+      Types.StructType struct, Class<S> structClass, List<Pair<Integer, 
ValueReader<?>>> readPlan) {
+    return new PlannedStructLikeReader<>(readPlan, struct, structClass);
+  }
+
+  private static class PlannedStructLikeReader<S extends StructLike>
+      extends ValueReaders.PlannedStructReader<S> {
+    private final Types.StructType structType;
+    private final Class<S> structClass;
+    private final DynConstructors.Ctor<S> ctor;
+
+    private PlannedStructLikeReader(
+        List<Pair<Integer, ValueReader<?>>> readPlan,
+        Types.StructType structType,
+        Class<S> structClass) {
+      super(readPlan);
+      this.structType = structType;
+      this.structClass = structClass;
+      this.ctor =
+          DynConstructors.builder(StructLike.class)
+              .hiddenImpl(structClass, Types.StructType.class)
+              .hiddenImpl(structClass)
+              .build();
+    }
+
+    @Override
+    protected S reuseOrCreate(Object reuse) {
+      if (structClass.isInstance(reuse)) {
+        return structClass.cast(reuse);
+      } else {
+        return ctor.newInstance(structType);
+      }
+    }
+
+    @Override
+    protected Object get(S struct, int pos) {
+      return struct.get(pos, Object.class);
+    }
+
+    @Override
+    protected void set(S struct, int pos, Object value) {
+      struct.set(pos, value);
+    }
+  }
+
+  private static class RecordReader extends 
ValueReaders.PlannedStructReader<GenericRecord> {
+    private final Types.StructType structType;
+
+    private RecordReader(
+        List<Pair<Integer, ValueReader<?>>> readPlan, Types.StructType 
structType) {
+      super(readPlan);
+      this.structType = structType;
+    }
+
+    @Override
+    protected GenericRecord reuseOrCreate(Object reuse) {
+      if (reuse instanceof GenericRecord) {
+        return (GenericRecord) reuse;
+      } else {
+        return GenericRecord.create(structType);
+      }
+    }
+
+    @Override
+    protected Object get(GenericRecord struct, int pos) {
+      return struct.get(pos);
+    }
+
+    @Override
+    protected void set(GenericRecord struct, int pos, Object value) {
+      struct.set(pos, value);
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java 
b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
index d530bc1854..246671076c 100644
--- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
+++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
@@ -181,6 +181,83 @@ public class ValueReaders {
     return new PlannedIndexedReader<>(recordSchema, recordClass, readPlan);
   }
 
+  public static ValueReader<Void> skipStruct(List<ValueReader<?>> readers) {
+    return new SkipStructReader(readers);
+  }
+
+  /**
+   * Builds a read plan for record classes that use planned reads instead of a 
ResolvingDecoder.
+   *
+   * @param expected expected StructType
+   * @param record Avro record schema
+   * @param fieldReaders list of readers for each field in the Avro record 
schema
+   * @param idToConstant a map of field ID to constants values
+   * @return a read plan that is a list of (position, reader) pairs
+   */
+  static List<Pair<Integer, ValueReader<?>>> buildReadPlan(
+      Types.StructType expected,
+      Schema record,
+      List<ValueReader<?>> fieldReaders,
+      Map<Integer, Object> idToConstant) {
+    Map<Integer, Integer> idToPos = idToPos(expected);
+
+    List<Pair<Integer, ValueReader<?>>> readPlan = Lists.newArrayList();
+    List<Schema.Field> fileFields = record.getFields();
+    for (int pos = 0; pos < fileFields.size(); pos += 1) {
+      Schema.Field field = fileFields.get(pos);
+      ValueReader<?> fieldReader = fieldReaders.get(pos);
+      Integer fieldId = AvroSchemaUtil.fieldId(field);
+      Integer projectionPos = idToPos.remove(fieldId);
+
+      Object constant = idToConstant.get(fieldId);
+      if (projectionPos != null && constant != null) {
+        readPlan.add(
+            Pair.of(projectionPos, 
ValueReaders.replaceWithConstant(fieldReader, constant)));
+      } else {
+        readPlan.add(Pair.of(projectionPos, fieldReader));
+      }
+    }
+
+    // handle any expected columns that are not in the data file
+    for (Map.Entry<Integer, Integer> idAndPos : idToPos.entrySet()) {
+      int fieldId = idAndPos.getKey();
+      int pos = idAndPos.getValue();
+
+      Object constant = idToConstant.get(fieldId);
+      Types.NestedField field = expected.field(fieldId);
+      if (constant != null) {
+        readPlan.add(Pair.of(pos, ValueReaders.constant(constant)));
+      } else if (field.initialDefault() != null) {
+        readPlan.add(Pair.of(pos, 
ValueReaders.constant(field.initialDefault())));
+      } else if (fieldId == MetadataColumns.IS_DELETED.fieldId()) {
+        readPlan.add(Pair.of(pos, ValueReaders.constant(false)));
+      } else if (fieldId == MetadataColumns.ROW_POSITION.fieldId()) {
+        readPlan.add(Pair.of(pos, ValueReaders.positions()));
+      } else if (field.isOptional()) {
+        readPlan.add(Pair.of(pos, ValueReaders.constant(null)));
+      } else {
+        throw new IllegalArgumentException(
+            String.format("Missing required field: %s", field.name()));
+      }
+    }
+
+    return readPlan;
+  }
+
+  private static Map<Integer, Integer> idToPos(Types.StructType struct) {
+    Map<Integer, Integer> idToPos = Maps.newHashMap();
+
+    if (struct != null) {
+      List<Types.NestedField> fields = struct.fields();
+      for (int pos = 0; pos < fields.size(); pos += 1) {
+        Types.NestedField field = fields.get(pos);
+        idToPos.put(field.fieldId(), pos);
+      }
+    }
+
+    return idToPos;
+  }
+
   private static class NullReader implements ValueReader<Object> {
     private static final NullReader INSTANCE = new NullReader();
 
@@ -777,6 +854,27 @@ public class ValueReaders {
     }
   }
 
+  private static class SkipStructReader implements ValueReader<Void> {
+    private final ValueReader<?>[] readers;
+
+    private SkipStructReader(List<ValueReader<?>> readers) {
+      this.readers = readers.toArray(ValueReader[]::new);
+    }
+
+    @Override
+    public Void read(Decoder decoder, Object reuse) throws IOException {
+      skip(decoder);
+      return null;
+    }
+
+    @Override
+    public void skip(Decoder decoder) throws IOException {
+      for (ValueReader<?> reader : readers) {
+        reader.skip(decoder);
+      }
+    }
+  }
+
   public abstract static class PlannedStructReader<S>
       implements ValueReader<S>, SupportsRowPosition {
     private final ValueReader<?>[] readers;
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReader.java 
b/core/src/test/java/org/apache/iceberg/TestManifestReader.java
index c9d0e29270..e45415f1f2 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestReader.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestReader.java
@@ -44,7 +44,8 @@ public class TestManifestReader extends TestBase {
               "fileOrdinal",
               "fileSequenceNumber",
               "fromProjectionPos",
-              "manifestLocation")
+              "manifestLocation",
+              "partitionData.partitionType.fieldsById")
           .build();
 
   @TestTemplate

Reply via email to