This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new f0e4fd2f55 Core: Add internal Avro reader (#11108)
f0e4fd2f55 is described below
commit f0e4fd2f557529eaa87c78d8d6585105e40b1f10
Author: Ryan Blue <[email protected]>
AuthorDate: Mon Oct 7 16:12:39 2024 -0700
Core: Add internal Avro reader (#11108)
---
.../java/org/apache/iceberg/ManifestReader.java | 26 ++-
.../org/apache/iceberg/avro/GenericAvroReader.java | 64 +-----
.../org/apache/iceberg/avro/InternalReader.java | 252 +++++++++++++++++++++
.../org/apache/iceberg/avro/InternalReaders.java | 110 +++++++++
.../java/org/apache/iceberg/avro/ValueReaders.java | 98 ++++++++
.../org/apache/iceberg/TestManifestReader.java | 3 +-
6 files changed, 483 insertions(+), 70 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java
b/core/src/main/java/org/apache/iceberg/ManifestReader.java
index 6364603c59..cf04eb7c47 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java
@@ -25,8 +25,10 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.avro.io.DatumReader;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.AvroIterable;
+import org.apache.iceberg.avro.InternalReader;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
@@ -65,16 +67,16 @@ public class ManifestReader<F extends ContentFile<F>>
extends CloseableGroup
"record_count");
protected enum FileType {
- DATA_FILES(GenericDataFile.class.getName()),
- DELETE_FILES(GenericDeleteFile.class.getName());
+ DATA_FILES(GenericDataFile.class),
+ DELETE_FILES(GenericDeleteFile.class);
- private final String fileClass;
+ private final Class<? extends StructLike> fileClass;
- FileType(String fileClass) {
+ FileType(Class<? extends StructLike> fileClass) {
this.fileClass = fileClass;
}
- private String fileClass() {
+ private Class<? extends StructLike> fileClass() {
return fileClass;
}
}
@@ -261,12 +263,7 @@ public class ManifestReader<F extends ContentFile<F>>
extends CloseableGroup
AvroIterable<ManifestEntry<F>> reader =
Avro.read(file)
.project(ManifestEntry.wrapFileSchema(Types.StructType.of(fields)))
- .rename("manifest_entry", GenericManifestEntry.class.getName())
- .rename("partition", PartitionData.class.getName())
- .rename("r102", PartitionData.class.getName())
- .rename("data_file", content.fileClass())
- .rename("r2", content.fileClass())
- .classLoader(GenericManifestEntry.class.getClassLoader())
+ .createResolvingReader(this::newReader)
.reuseContainers()
.build();
@@ -279,6 +276,13 @@ public class ManifestReader<F extends ContentFile<F>>
extends CloseableGroup
}
}
+ private DatumReader<?> newReader(Schema schema) {
+ return InternalReader.create(schema)
+ .setRootType(GenericManifestEntry.class)
+ .setCustomType(ManifestEntry.DATA_FILE_ID, content.fileClass())
+ .setCustomType(DataFile.PARTITION_ID, PartitionData.class);
+ }
+
CloseableIterable<ManifestEntry<F>> liveEntries() {
return entries(true /* only live entries */);
}
diff --git a/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java
b/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java
index f630129dc5..bfdb65acf1 100644
--- a/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java
+++ b/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java
@@ -28,11 +28,8 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
-import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.common.DynClasses;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
@@ -43,7 +40,7 @@ public class GenericAvroReader<T>
private final Types.StructType expectedType;
private ClassLoader loader = Thread.currentThread().getContextClassLoader();
private Map<String, String> renames = ImmutableMap.of();
- private final Map<Integer, ?> idToConstant = ImmutableMap.of();
+ private final Map<Integer, Object> idToConstant = ImmutableMap.of();
private Schema fileSchema = null;
private ValueReader<T> reader = null;
@@ -111,48 +108,13 @@ public class GenericAvroReader<T>
@Override
public ValueReader<?> record(Type partner, Schema record,
List<ValueReader<?>> fieldResults) {
- Types.StructType expected = partner != null ? partner.asStructType() :
null;
- Map<Integer, Integer> idToPos = idToPos(expected);
-
- List<Pair<Integer, ValueReader<?>>> readPlan = Lists.newArrayList();
- List<Schema.Field> fileFields = record.getFields();
- for (int pos = 0; pos < fileFields.size(); pos += 1) {
- Schema.Field field = fileFields.get(pos);
- ValueReader<?> fieldReader = fieldResults.get(pos);
- Integer fieldId = AvroSchemaUtil.fieldId(field);
- Integer projectionPos = idToPos.remove(fieldId);
-
- Object constant = idToConstant.get(fieldId);
- if (projectionPos != null && constant != null) {
- readPlan.add(
- Pair.of(projectionPos,
ValueReaders.replaceWithConstant(fieldReader, constant)));
- } else {
- readPlan.add(Pair.of(projectionPos, fieldReader));
- }
+ if (partner == null) {
+ return ValueReaders.skipStruct(fieldResults);
}
- // handle any expected columns that are not in the data file
- for (Map.Entry<Integer, Integer> idAndPos : idToPos.entrySet()) {
- int fieldId = idAndPos.getKey();
- int pos = idAndPos.getValue();
-
- Object constant = idToConstant.get(fieldId);
- Types.NestedField field = expected.field(fieldId);
- if (constant != null) {
- readPlan.add(Pair.of(pos, ValueReaders.constant(constant)));
- } else if (field.initialDefault() != null) {
- readPlan.add(Pair.of(pos,
ValueReaders.constant(field.initialDefault())));
- } else if (fieldId == MetadataColumns.IS_DELETED.fieldId()) {
- readPlan.add(Pair.of(pos, ValueReaders.constant(false)));
- } else if (fieldId == MetadataColumns.ROW_POSITION.fieldId()) {
- readPlan.add(Pair.of(pos, ValueReaders.positions()));
- } else if (field.isOptional()) {
- readPlan.add(Pair.of(pos, ValueReaders.constant(null)));
- } else {
- throw new IllegalArgumentException(
- String.format("Missing required field: %s", field.name()));
- }
- }
+ Types.StructType expected = partner.asStructType();
+ List<Pair<Integer, ValueReader<?>>> readPlan =
+ ValueReaders.buildReadPlan(expected, record, fieldResults,
idToConstant);
return recordReader(readPlan, avroSchemas.get(partner),
record.getFullName());
}
@@ -266,19 +228,5 @@ public class GenericAvroReader<T>
throw new IllegalArgumentException("Unsupported type: " + primitive);
}
}
-
- private Map<Integer, Integer> idToPos(Types.StructType struct) {
- Map<Integer, Integer> idToPos = Maps.newHashMap();
-
- if (struct != null) {
- List<Types.NestedField> fields = struct.fields();
- for (int pos = 0; pos < fields.size(); pos += 1) {
- Types.NestedField field = fields.get(pos);
- idToPos.put(field.fieldId(), pos);
- }
- }
-
- return idToPos;
- }
}
}
diff --git a/core/src/main/java/org/apache/iceberg/avro/InternalReader.java
b/core/src/main/java/org/apache/iceberg/avro/InternalReader.java
new file mode 100644
index 0000000000..ca83ce2ba7
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/avro/InternalReader.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.avro;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
+
+/**
+ * A reader that produces Iceberg's internal in-memory object model.
+ *
+ * <p>Iceberg's internal in-memory object model produces the types defined in
{@link
+ * Type.TypeID#javaClass()}.
+ *
+ * @param <T> Java type returned by the reader
+ */
+public class InternalReader<T> implements DatumReader<T>, SupportsRowPosition {
+ private static final int ROOT_ID = -1;
+
+ private final Types.StructType expectedType;
+ private final Map<Integer, Class<? extends StructLike>> typeMap =
Maps.newHashMap();
+ private final Map<Integer, Object> idToConstant = ImmutableMap.of();
+ private Schema fileSchema = null;
+ private ValueReader<T> reader = null;
+
+ public static <D> InternalReader<D> create(org.apache.iceberg.Schema schema)
{
+ return new InternalReader<>(schema);
+ }
+
+ InternalReader(org.apache.iceberg.Schema readSchema) {
+ this.expectedType = readSchema.asStruct();
+ }
+
+ @SuppressWarnings("unchecked")
+ private void initReader() {
+ this.reader =
+ (ValueReader<T>)
+ AvroWithPartnerVisitor.visit(
+ Pair.of(ROOT_ID, expectedType),
+ fileSchema,
+ new ResolvingReadBuilder(),
+ AccessByID.instance());
+ }
+
+ @Override
+ public void setSchema(Schema schema) {
+ this.fileSchema = schema;
+ initReader();
+ }
+
+ public InternalReader<T> setRootType(Class<? extends StructLike> rootClass) {
+ typeMap.put(ROOT_ID, rootClass);
+ return this;
+ }
+
+ public InternalReader<T> setCustomType(int fieldId, Class<? extends
StructLike> structClass) {
+ typeMap.put(fieldId, structClass);
+ return this;
+ }
+
+ @Override
+ public void setRowPositionSupplier(Supplier<Long> posSupplier) {
+ if (reader instanceof SupportsRowPosition) {
+ ((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier);
+ }
+ }
+
+ @Override
+ public T read(T reuse, Decoder decoder) throws IOException {
+ return reader.read(decoder, reuse);
+ }
+
+ private class ResolvingReadBuilder
+ extends AvroWithPartnerVisitor<Pair<Integer, Type>, ValueReader<?>> {
+ @Override
+ public ValueReader<?> record(
+ Pair<Integer, Type> partner, Schema record, List<ValueReader<?>>
fieldResults) {
+ if (partner == null) {
+ return ValueReaders.skipStruct(fieldResults);
+ }
+
+ Types.StructType expected = partner.second().asStructType();
+ List<Pair<Integer, ValueReader<?>>> readPlan =
+ ValueReaders.buildReadPlan(expected, record, fieldResults,
idToConstant);
+
+ return structReader(readPlan, partner.first(), expected);
+ }
+
+ private ValueReader<?> structReader(
+ List<Pair<Integer, ValueReader<?>>> readPlan, int fieldId,
Types.StructType struct) {
+
+ Class<? extends StructLike> structClass = typeMap.get(fieldId);
+ if (structClass != null) {
+ return InternalReaders.struct(struct, structClass, readPlan);
+ } else {
+ return InternalReaders.struct(struct, readPlan);
+ }
+ }
+
+ @Override
+ public ValueReader<?> union(
+ Pair<Integer, Type> partner, Schema union, List<ValueReader<?>>
options) {
+ return ValueReaders.union(options);
+ }
+
+ @Override
+ public ValueReader<?> arrayMap(
+ Pair<Integer, Type> partner,
+ Schema map,
+ ValueReader<?> keyReader,
+ ValueReader<?> valueReader) {
+ return ValueReaders.arrayMap(keyReader, valueReader);
+ }
+
+ @Override
+ public ValueReader<?> array(
+ Pair<Integer, Type> partner, Schema array, ValueReader<?>
elementReader) {
+ return ValueReaders.array(elementReader);
+ }
+
+ @Override
+ public ValueReader<?> map(Pair<Integer, Type> partner, Schema map,
ValueReader<?> valueReader) {
+ return ValueReaders.map(ValueReaders.strings(), valueReader);
+ }
+
+ @Override
+ public ValueReader<?> primitive(Pair<Integer, Type> partner, Schema
primitive) {
+ LogicalType logicalType = primitive.getLogicalType();
+ if (logicalType != null) {
+ switch (logicalType.getName()) {
+ case "date":
+ return ValueReaders.ints();
+
+ case "time-micros":
+ return ValueReaders.longs();
+
+ case "timestamp-millis":
+ // adjust to microseconds
+ ValueReader<Long> longs = ValueReaders.longs();
+ return (ValueReader<Long>) (decoder, ignored) ->
longs.read(decoder, null) * 1000L;
+
+ case "timestamp-micros":
+ return ValueReaders.longs();
+
+ case "decimal":
+ return ValueReaders.decimal(
+ ValueReaders.decimalBytesReader(primitive),
+ ((LogicalTypes.Decimal) logicalType).getScale());
+
+ case "uuid":
+ return ValueReaders.uuids();
+
+ default:
+ throw new IllegalArgumentException("Unknown logical type: " +
logicalType);
+ }
+ }
+
+ switch (primitive.getType()) {
+ case NULL:
+ return ValueReaders.nulls();
+ case BOOLEAN:
+ return ValueReaders.booleans();
+ case INT:
+ if (partner != null && partner.second().typeId() ==
Type.TypeID.LONG) {
+ return ValueReaders.intsAsLongs();
+ }
+ return ValueReaders.ints();
+ case LONG:
+ return ValueReaders.longs();
+ case FLOAT:
+ if (partner != null && partner.second().typeId() ==
Type.TypeID.DOUBLE) {
+ return ValueReaders.floatsAsDoubles();
+ }
+ return ValueReaders.floats();
+ case DOUBLE:
+ return ValueReaders.doubles();
+ case STRING:
+ return ValueReaders.strings();
+ case FIXED:
+ return ValueReaders.fixed(primitive);
+ case BYTES:
+ return ValueReaders.byteBuffers();
+ case ENUM:
+ return ValueReaders.enums(primitive.getEnumSymbols());
+ default:
+ throw new IllegalArgumentException("Unsupported type: " + primitive);
+ }
+ }
+ }
+
+ private static class AccessByID
+ implements AvroWithPartnerVisitor.PartnerAccessors<Pair<Integer, Type>> {
+ private static final AccessByID INSTANCE = new AccessByID();
+
+ public static AccessByID instance() {
+ return INSTANCE;
+ }
+
+ @Override
+ public Pair<Integer, Type> fieldPartner(
+ Pair<Integer, Type> partner, Integer fieldId, String name) {
+ Types.NestedField field = partner.second().asStructType().field(fieldId);
+ return field != null ? Pair.of(field.fieldId(), field.type()) : null;
+ }
+
+ @Override
+ public Pair<Integer, Type> mapKeyPartner(Pair<Integer, Type> partner) {
+ Types.MapType map = partner.second().asMapType();
+ return Pair.of(map.keyId(), map.keyType());
+ }
+
+ @Override
+ public Pair<Integer, Type> mapValuePartner(Pair<Integer, Type> partner) {
+ Types.MapType map = partner.second().asMapType();
+ return Pair.of(map.valueId(), map.valueType());
+ }
+
+ @Override
+ public Pair<Integer, Type> listElementPartner(Pair<Integer, Type> partner)
{
+ Types.ListType list = partner.second().asListType();
+ return Pair.of(list.elementId(), list.elementType());
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/avro/InternalReaders.java
b/core/src/main/java/org/apache/iceberg/avro/InternalReaders.java
new file mode 100644
index 0000000000..6136bae052
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/avro/InternalReaders.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.avro;
+
+import java.util.List;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.common.DynConstructors;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
+
+class InternalReaders {
+ private InternalReaders() {}
+
+ static ValueReader<? extends Record> struct(
+ Types.StructType struct, List<Pair<Integer, ValueReader<?>>> readPlan) {
+ return new RecordReader(readPlan, struct);
+ }
+
+ static <S extends StructLike> ValueReader<S> struct(
+ Types.StructType struct, Class<S> structClass, List<Pair<Integer,
ValueReader<?>>> readPlan) {
+ return new PlannedStructLikeReader<>(readPlan, struct, structClass);
+ }
+
+ private static class PlannedStructLikeReader<S extends StructLike>
+ extends ValueReaders.PlannedStructReader<S> {
+ private final Types.StructType structType;
+ private final Class<S> structClass;
+ private final DynConstructors.Ctor<S> ctor;
+
+ private PlannedStructLikeReader(
+ List<Pair<Integer, ValueReader<?>>> readPlan,
+ Types.StructType structType,
+ Class<S> structClass) {
+ super(readPlan);
+ this.structType = structType;
+ this.structClass = structClass;
+ this.ctor =
+ DynConstructors.builder(StructLike.class)
+ .hiddenImpl(structClass, Types.StructType.class)
+ .hiddenImpl(structClass)
+ .build();
+ }
+
+ @Override
+ protected S reuseOrCreate(Object reuse) {
+ if (structClass.isInstance(reuse)) {
+ return structClass.cast(reuse);
+ } else {
+ return ctor.newInstance(structType);
+ }
+ }
+
+ @Override
+ protected Object get(S struct, int pos) {
+ return struct.get(pos, Object.class);
+ }
+
+ @Override
+ protected void set(S struct, int pos, Object value) {
+ struct.set(pos, value);
+ }
+ }
+
+ private static class RecordReader extends
ValueReaders.PlannedStructReader<GenericRecord> {
+ private final Types.StructType structType;
+
+ private RecordReader(
+ List<Pair<Integer, ValueReader<?>>> readPlan, Types.StructType
structType) {
+ super(readPlan);
+ this.structType = structType;
+ }
+
+ @Override
+ protected GenericRecord reuseOrCreate(Object reuse) {
+ if (reuse instanceof GenericRecord) {
+ return (GenericRecord) reuse;
+ } else {
+ return GenericRecord.create(structType);
+ }
+ }
+
+ @Override
+ protected Object get(GenericRecord struct, int pos) {
+ return struct.get(pos);
+ }
+
+ @Override
+ protected void set(GenericRecord struct, int pos, Object value) {
+ struct.set(pos, value);
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
index d530bc1854..246671076c 100644
--- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
+++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
@@ -181,6 +181,83 @@ public class ValueReaders {
return new PlannedIndexedReader<>(recordSchema, recordClass, readPlan);
}
+ public static ValueReader<Void> skipStruct(List<ValueReader<?>> readers) {
+ return new SkipStructReader(readers);
+ }
+
+ /**
+ * Builds a read plan for record classes that use planned reads instead of a
ResolvingDecoder.
+ *
+ * @param expected expected StructType
+ * @param record Avro record schema
+ * @param fieldReaders list of readers for each field in the Avro record
schema
+ * @param idToConstant a map of field ID to constants values
+ * @return a read plan that is a list of (position, reader) pairs
+ */
+ static List<Pair<Integer, ValueReader<?>>> buildReadPlan(
+ Types.StructType expected,
+ Schema record,
+ List<ValueReader<?>> fieldReaders,
+ Map<Integer, Object> idToConstant) {
+ Map<Integer, Integer> idToPos = idToPos(expected);
+
+ List<Pair<Integer, ValueReader<?>>> readPlan = Lists.newArrayList();
+ List<Schema.Field> fileFields = record.getFields();
+ for (int pos = 0; pos < fileFields.size(); pos += 1) {
+ Schema.Field field = fileFields.get(pos);
+ ValueReader<?> fieldReader = fieldReaders.get(pos);
+ Integer fieldId = AvroSchemaUtil.fieldId(field);
+ Integer projectionPos = idToPos.remove(fieldId);
+
+ Object constant = idToConstant.get(fieldId);
+ if (projectionPos != null && constant != null) {
+ readPlan.add(
+ Pair.of(projectionPos,
ValueReaders.replaceWithConstant(fieldReader, constant)));
+ } else {
+ readPlan.add(Pair.of(projectionPos, fieldReader));
+ }
+ }
+
+ // handle any expected columns that are not in the data file
+ for (Map.Entry<Integer, Integer> idAndPos : idToPos.entrySet()) {
+ int fieldId = idAndPos.getKey();
+ int pos = idAndPos.getValue();
+
+ Object constant = idToConstant.get(fieldId);
+ Types.NestedField field = expected.field(fieldId);
+ if (constant != null) {
+ readPlan.add(Pair.of(pos, ValueReaders.constant(constant)));
+ } else if (field.initialDefault() != null) {
+ readPlan.add(Pair.of(pos,
ValueReaders.constant(field.initialDefault())));
+ } else if (fieldId == MetadataColumns.IS_DELETED.fieldId()) {
+ readPlan.add(Pair.of(pos, ValueReaders.constant(false)));
+ } else if (fieldId == MetadataColumns.ROW_POSITION.fieldId()) {
+ readPlan.add(Pair.of(pos, ValueReaders.positions()));
+ } else if (field.isOptional()) {
+ readPlan.add(Pair.of(pos, ValueReaders.constant(null)));
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Missing required field: %s", field.name()));
+ }
+ }
+
+ return readPlan;
+ }
+
+ private static Map<Integer, Integer> idToPos(Types.StructType struct) {
+ Map<Integer, Integer> idToPos = Maps.newHashMap();
+
+ if (struct != null) {
+ List<Types.NestedField> fields = struct.fields();
+ for (int pos = 0; pos < fields.size(); pos += 1) {
+ Types.NestedField field = fields.get(pos);
+ idToPos.put(field.fieldId(), pos);
+ }
+ }
+
+ return idToPos;
+ }
+
private static class NullReader implements ValueReader<Object> {
private static final NullReader INSTANCE = new NullReader();
@@ -777,6 +854,27 @@ public class ValueReaders {
}
}
+ private static class SkipStructReader implements ValueReader<Void> {
+ private final ValueReader<?>[] readers;
+
+ private SkipStructReader(List<ValueReader<?>> readers) {
+ this.readers = readers.toArray(ValueReader[]::new);
+ }
+
+ @Override
+ public Void read(Decoder decoder, Object reuse) throws IOException {
+ skip(decoder);
+ return null;
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ for (ValueReader<?> reader : readers) {
+ reader.skip(decoder);
+ }
+ }
+ }
+
public abstract static class PlannedStructReader<S>
implements ValueReader<S>, SupportsRowPosition {
private final ValueReader<?>[] readers;
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReader.java
b/core/src/test/java/org/apache/iceberg/TestManifestReader.java
index c9d0e29270..e45415f1f2 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestReader.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestReader.java
@@ -44,7 +44,8 @@ public class TestManifestReader extends TestBase {
"fileOrdinal",
"fileSequenceNumber",
"fromProjectionPos",
- "manifestLocation")
+ "manifestLocation",
+ "partitionData.partitionType.fieldsById")
.build();
@TestTemplate