This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 604422b056 Core: Refactor internal Avro reader to resolve schemas 
directly (#9366)
604422b056 is described below

commit 604422b056efee145f69944c39b4d99787dfd9f1
Author: Ryan Blue <[email protected]>
AuthorDate: Mon Jan 1 11:03:41 2024 -0800

    Core: Refactor internal Avro reader to resolve schemas directly (#9366)
---
 .../main/java/org/apache/iceberg/avro/Avro.java    |  52 ++-
 .../java/org/apache/iceberg/avro/AvroIterable.java |   4 +-
 .../iceberg/avro/AvroWithPartnerVisitor.java       | 211 ++++++++++++
 .../org/apache/iceberg/avro/GenericAvroReader.java | 166 +++++++--
 .../iceberg/avro/NameMappingDatumReader.java       |  66 ++++
 ...ValueReader.java => SupportsCustomRecords.java} |  10 +-
 .../java/org/apache/iceberg/avro/ValueReader.java  |   4 +
 .../java/org/apache/iceberg/avro/ValueReaders.java | 369 +++++++++++++++++++--
 .../apache/iceberg/avro/TestAvroNameMapping.java   |  23 +-
 9 files changed, 813 insertions(+), 92 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java 
b/core/src/main/java/org/apache/iceberg/avro/Avro.java
index 85cc8d9020..afc502e20d 100644
--- a/core/src/main/java/org/apache/iceberg/avro/Avro.java
+++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java
@@ -61,6 +61,7 @@ import org.apache.iceberg.io.DeleteSchemaUtil;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.mapping.MappingUtil;
 import org.apache.iceberg.mapping.NameMapping;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -610,11 +611,12 @@ public class Avro {
     private org.apache.iceberg.Schema schema = null;
     private Function<Schema, DatumReader<?>> createReaderFunc = null;
     private BiFunction<org.apache.iceberg.Schema, Schema, DatumReader<?>> 
createReaderBiFunc = null;
+    private Function<org.apache.iceberg.Schema, DatumReader<?>> 
createResolvingReaderFunc = null;
 
     @SuppressWarnings("UnnecessaryLambda")
-    private final Function<Schema, DatumReader<?>> defaultCreateReaderFunc =
+    private final Function<org.apache.iceberg.Schema, DatumReader<?>> 
defaultCreateReaderFunc =
         readSchema -> {
-          GenericAvroReader<?> reader = new GenericAvroReader<>(readSchema);
+          GenericAvroReader<?> reader = GenericAvroReader.create(readSchema);
           reader.setClassLoader(loader);
           return reader;
         };
@@ -627,15 +629,28 @@ public class Avro {
       this.file = file;
     }
 
+    public ReadBuilder createResolvingReader(
+        Function<org.apache.iceberg.Schema, DatumReader<?>> readerFunction) {
+      Preconditions.checkState(
+          createReaderBiFunc == null && createReaderFunc == null,
+          "Cannot set multiple read builder functions");
+      this.createResolvingReaderFunc = readerFunction;
+      return this;
+    }
+
     public ReadBuilder createReaderFunc(Function<Schema, DatumReader<?>> 
readerFunction) {
-      Preconditions.checkState(createReaderBiFunc == null, "Cannot set 
multiple createReaderFunc");
+      Preconditions.checkState(
+          createReaderBiFunc == null && createResolvingReaderFunc == null,
+          "Cannot set multiple read builder functions");
       this.createReaderFunc = readerFunction;
       return this;
     }
 
     public ReadBuilder createReaderFunc(
         BiFunction<org.apache.iceberg.Schema, Schema, DatumReader<?>> 
readerFunction) {
-      Preconditions.checkState(createReaderFunc == null, "Cannot set multiple 
createReaderFunc");
+      Preconditions.checkState(
+          createReaderFunc == null && createResolvingReaderFunc == null,
+          "Cannot set multiple read builder functions");
       this.createReaderBiFunc = readerFunction;
       return this;
     }
@@ -683,23 +698,34 @@ public class Avro {
       return this;
     }
 
+    @SuppressWarnings("unchecked")
     public <D> AvroIterable<D> build() {
       Preconditions.checkNotNull(schema, "Schema is required");
-      Function<Schema, DatumReader<?>> readerFunc;
+
+      if (null == nameMapping) {
+        this.nameMapping = MappingUtil.create(schema);
+      }
+
+      DatumReader<D> reader;
       if (createReaderBiFunc != null) {
-        readerFunc = avroSchema -> createReaderBiFunc.apply(schema, 
avroSchema);
+        reader =
+            new ProjectionDatumReader<>(
+                avroSchema -> createReaderBiFunc.apply(schema, avroSchema), 
schema, renames, null);
       } else if (createReaderFunc != null) {
-        readerFunc = createReaderFunc;
+        reader = new ProjectionDatumReader<>(createReaderFunc, schema, 
renames, null);
+      } else if (createResolvingReaderFunc != null) {
+        reader = (DatumReader<D>) createResolvingReaderFunc.apply(schema);
       } else {
-        readerFunc = defaultCreateReaderFunc;
+        reader = (DatumReader<D>) defaultCreateReaderFunc.apply(schema);
+      }
+
+      if (reader instanceof SupportsCustomRecords) {
+        ((SupportsCustomRecords) reader).setClassLoader(loader);
+        ((SupportsCustomRecords) reader).setRenames(renames);
       }
 
       return new AvroIterable<>(
-          file,
-          new ProjectionDatumReader<>(readerFunc, schema, renames, 
nameMapping),
-          start,
-          length,
-          reuseContainers);
+          file, new NameMappingDatumReader<>(nameMapping, reader), start, 
length, reuseContainers);
     }
   }
 
diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroIterable.java 
b/core/src/main/java/org/apache/iceberg/avro/AvroIterable.java
index 49acb8010b..3bcc6a4799 100644
--- a/core/src/main/java/org/apache/iceberg/avro/AvroIterable.java
+++ b/core/src/main/java/org/apache/iceberg/avro/AvroIterable.java
@@ -31,6 +31,7 @@ import org.apache.iceberg.io.CloseableGroup;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Suppliers;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 
 public class AvroIterable<D> extends CloseableGroup implements 
CloseableIterable<D> {
@@ -78,7 +79,8 @@ public class AvroIterable<D> extends CloseableGroup 
implements CloseableIterable
     if (start != null) {
       if (reader instanceof SupportsRowPosition) {
         ((SupportsRowPosition) reader)
-            .setRowPositionSupplier(() -> 
AvroIO.findStartingRowPos(file::newStream, start));
+            .setRowPositionSupplier(
+                Suppliers.memoize(() -> 
AvroIO.findStartingRowPos(file::newStream, start)));
       }
       fileReader = new AvroRangeIterator<>(fileReader, start, end);
     } else if (reader instanceof SupportsRowPosition) {
diff --git 
a/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerVisitor.java 
b/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerVisitor.java
new file mode 100644
index 0000000000..0147dbf37d
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerVisitor.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.avro;
+
+import java.util.Deque;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+public class AvroWithPartnerVisitor<P, R> {
+  public interface PartnerAccessors<P> {
+    P fieldPartner(P partnerStruct, Integer fieldId, String name);
+
+    P mapKeyPartner(P partnerMap);
+
+    P mapValuePartner(P partnerMap);
+
+    P listElementPartner(P partnerList);
+  }
+
+  static class FieldIDAccessors implements 
AvroWithPartnerVisitor.PartnerAccessors<Type> {
+    private static final FieldIDAccessors INSTANCE = new FieldIDAccessors();
+
+    public static FieldIDAccessors get() {
+      return INSTANCE;
+    }
+
+    @Override
+    public Type fieldPartner(Type partner, Integer fieldId, String name) {
+      Types.NestedField field = partner.asStructType().field(fieldId);
+      return field != null ? field.type() : null;
+    }
+
+    @Override
+    public Type mapKeyPartner(Type partner) {
+      return partner.asMapType().keyType();
+    }
+
+    @Override
+    public Type mapValuePartner(Type partner) {
+      return partner.asMapType().valueType();
+    }
+
+    @Override
+    public Type listElementPartner(Type partner) {
+      return partner.asListType().elementType();
+    }
+  }
+
+  /** Used to fail on recursive types. */
+  private Deque<String> recordLevels = Lists.newLinkedList();
+
+  public R record(P partner, Schema record, List<R> fieldResults) {
+    return null;
+  }
+
+  public R union(P partner, Schema union, List<R> optionResults) {
+    return null;
+  }
+
+  public R array(P partner, Schema array, R elementResult) {
+    return null;
+  }
+
+  public R arrayMap(P partner, Schema map, R keyResult, R valueResult) {
+    return null;
+  }
+
+  public R map(P partner, Schema map, R valueResult) {
+    return null;
+  }
+
+  public R primitive(P partner, Schema primitive) {
+    return null;
+  }
+
+  public static <P, R> R visit(
+      P partner,
+      Schema schema,
+      AvroWithPartnerVisitor<P, R> visitor,
+      PartnerAccessors<P> accessors) {
+    switch (schema.getType()) {
+      case RECORD:
+        return visitRecord(partner, schema, visitor, accessors);
+
+      case UNION:
+        return visitUnion(partner, schema, visitor, accessors);
+
+      case ARRAY:
+        return visitArray(partner, schema, visitor, accessors);
+
+      case MAP:
+        return visitor.map(
+            partner,
+            schema,
+            visit(
+                partner != null ? accessors.mapValuePartner(partner) : null,
+                schema.getValueType(),
+                visitor,
+                accessors));
+
+      default:
+        return visitor.primitive(partner, schema);
+    }
+  }
+
+  private static <P, R> R visitRecord(
+      P partnerStruct,
+      Schema record,
+      AvroWithPartnerVisitor<P, R> visitor,
+      PartnerAccessors<P> accessors) {
+    // check to make sure this hasn't been visited before
+    String recordName = record.getFullName();
+    Preconditions.checkState(
+        !visitor.recordLevels.contains(recordName),
+        "Cannot process recursive Avro record %s",
+        recordName);
+    visitor.recordLevels.push(recordName);
+
+    List<Schema.Field> fields = record.getFields();
+    List<R> results = Lists.newArrayListWithExpectedSize(fields.size());
+    for (int pos = 0; pos < fields.size(); pos += 1) {
+      Schema.Field field = fields.get(pos);
+      Integer fieldId = AvroSchemaUtil.fieldId(field);
+
+      P fieldPartner =
+          partnerStruct != null && fieldId != null
+              ? accessors.fieldPartner(partnerStruct, fieldId, field.name())
+              : null;
+      results.add(visit(fieldPartner, field.schema(), visitor, accessors));
+    }
+
+    visitor.recordLevels.pop();
+
+    return visitor.record(partnerStruct, record, results);
+  }
+
+  private static <P, R> R visitUnion(
+      P partner,
+      Schema union,
+      AvroWithPartnerVisitor<P, R> visitor,
+      PartnerAccessors<P> accessors) {
+    Preconditions.checkArgument(
+        AvroSchemaUtil.isOptionSchema(union), "Cannot visit non-option union: 
%s", union);
+
+    List<Schema> types = union.getTypes();
+    List<R> options = Lists.newArrayListWithExpectedSize(types.size());
+    for (Schema branch : types) {
+      options.add(visit(partner, branch, visitor, accessors));
+    }
+
+    return visitor.union(partner, union, options);
+  }
+
+  private static <P, R> R visitArray(
+      P partnerArray,
+      Schema array,
+      AvroWithPartnerVisitor<P, R> visitor,
+      PartnerAccessors<P> accessors) {
+    if (array.getLogicalType() instanceof LogicalMap) {
+      Preconditions.checkState(
+          AvroSchemaUtil.isKeyValueSchema(array.getElementType()),
+          "Cannot visit invalid logical map type: %s",
+          array);
+
+      List<Schema.Field> keyValueFields = array.getElementType().getFields();
+      return visitor.arrayMap(
+          partnerArray,
+          array,
+          visit(
+              partnerArray != null ? accessors.mapKeyPartner(partnerArray) : 
null,
+              keyValueFields.get(0).schema(),
+              visitor,
+              accessors),
+          visit(
+              partnerArray != null ? accessors.mapValuePartner(partnerArray) : 
null,
+              keyValueFields.get(1).schema(),
+              visitor,
+              accessors));
+
+    } else {
+      return visitor.array(
+          partnerArray,
+          array,
+          visit(
+              partnerArray != null ? 
accessors.listElementPartner(partnerArray) : null,
+              array.getElementType(),
+              visitor,
+              accessors));
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java 
b/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java
index 0fa2e79581..d89489d92a 100644
--- a/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java
+++ b/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.avro;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import java.util.function.Supplier;
 import org.apache.avro.LogicalType;
 import org.apache.avro.LogicalTypes;
@@ -27,39 +28,68 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
+import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.common.DynClasses;
-import org.apache.iceberg.data.avro.DecoderResolver;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
 
-public class GenericAvroReader<T> implements DatumReader<T>, 
SupportsRowPosition {
+public class GenericAvroReader<T>
+    implements DatumReader<T>, SupportsRowPosition, SupportsCustomRecords {
 
-  private final Schema readSchema;
+  private final Types.StructType expectedType;
   private ClassLoader loader = Thread.currentThread().getContextClassLoader();
+  private Map<String, String> renames = ImmutableMap.of();
+  private Map<Integer, ?> idToConstant = ImmutableMap.of();
   private Schema fileSchema = null;
   private ValueReader<T> reader = null;
 
+  public static <D> GenericAvroReader<D> create(org.apache.iceberg.Schema 
schema) {
+    return new GenericAvroReader<>(schema);
+  }
+
   public static <D> GenericAvroReader<D> create(Schema schema) {
     return new GenericAvroReader<>(schema);
   }
 
+  GenericAvroReader(org.apache.iceberg.Schema readSchema) {
+    this.expectedType = readSchema.asStruct();
+  }
+
   GenericAvroReader(Schema readSchema) {
-    this.readSchema = readSchema;
+    this.expectedType = AvroSchemaUtil.convert(readSchema).asStructType();
   }
 
   @SuppressWarnings("unchecked")
   private void initReader() {
-    this.reader = (ValueReader<T>) AvroSchemaVisitor.visit(readSchema, new 
ReadBuilder(loader));
+    this.reader =
+        (ValueReader<T>)
+            AvroWithPartnerVisitor.visit(
+                expectedType,
+                fileSchema,
+                new ResolvingReadBuilder(expectedType, 
fileSchema.getFullName()),
+                AvroWithPartnerVisitor.FieldIDAccessors.get());
   }
 
   @Override
   public void setSchema(Schema schema) {
-    this.fileSchema = Schema.applyAliases(schema, readSchema);
+    this.fileSchema = schema;
     initReader();
   }
 
+  @Override
   public void setClassLoader(ClassLoader newClassLoader) {
     this.loader = newClassLoader;
   }
 
+  @Override
+  public void setRenames(Map<String, String> renames) {
+    this.renames = renames;
+  }
+
   @Override
   public void setRowPositionSupplier(Supplier<Long> posSupplier) {
     if (reader instanceof SupportsRowPosition) {
@@ -69,62 +99,108 @@ public class GenericAvroReader<T> implements 
DatumReader<T>, SupportsRowPosition
 
   @Override
   public T read(T reuse, Decoder decoder) throws IOException {
-    return DecoderResolver.resolveAndRead(decoder, readSchema, fileSchema, 
reader, reuse);
+    return reader.read(decoder, reuse);
   }
 
-  private static class ReadBuilder extends AvroSchemaVisitor<ValueReader<?>> {
-    private final ClassLoader loader;
+  private class ResolvingReadBuilder extends AvroWithPartnerVisitor<Type, 
ValueReader<?>> {
+    private final Map<Type, Schema> avroSchemas;
 
-    private ReadBuilder(ClassLoader loader) {
-      this.loader = loader;
+    private ResolvingReadBuilder(Types.StructType expectedType, String 
rootName) {
+      this.avroSchemas = AvroSchemaUtil.convertTypes(expectedType, rootName);
     }
 
     @Override
-    @SuppressWarnings("unchecked")
-    public ValueReader<?> record(Schema record, List<String> names, 
List<ValueReader<?>> fields) {
-      try {
-        Class<?> recordClass =
-            
DynClasses.builder().loader(loader).impl(record.getFullName()).buildChecked();
-        if (IndexedRecord.class.isAssignableFrom(recordClass)) {
-          return ValueReaders.record(fields, (Class<? extends IndexedRecord>) 
recordClass, record);
+    public ValueReader<?> record(Type partner, Schema record, 
List<ValueReader<?>> fieldResults) {
+      Types.StructType expected = partner != null ? partner.asStructType() : 
null;
+      Map<Integer, Integer> idToPos = idToPos(expected);
+
+      List<Pair<Integer, ValueReader<?>>> readPlan = Lists.newArrayList();
+      List<Schema.Field> fileFields = record.getFields();
+      for (int pos = 0; pos < fileFields.size(); pos += 1) {
+        Schema.Field field = fileFields.get(pos);
+        ValueReader<?> fieldReader = fieldResults.get(pos);
+        Integer fieldId = AvroSchemaUtil.fieldId(field);
+        Integer projectionPos = idToPos.remove(fieldId);
+
+        Object constant = idToConstant.get(fieldId);
+        if (projectionPos != null && constant != null) {
+          readPlan.add(
+              Pair.of(projectionPos, 
ValueReaders.replaceWithConstant(fieldReader, constant)));
+        } else {
+          readPlan.add(Pair.of(projectionPos, fieldReader));
+        }
+      }
+
+      // handle any expected columns that are not in the data file
+      for (Map.Entry<Integer, Integer> idAndPos : idToPos.entrySet()) {
+        int fieldId = idAndPos.getKey();
+        int pos = idAndPos.getValue();
+
+        Object constant = idToConstant.get(fieldId);
+        Types.NestedField field = expected.field(fieldId);
+        if (constant != null) {
+          readPlan.add(Pair.of(pos, ValueReaders.constant(constant)));
+        } else if (fieldId == MetadataColumns.IS_DELETED.fieldId()) {
+          readPlan.add(Pair.of(pos, ValueReaders.constant(false)));
+        } else if (fieldId == MetadataColumns.ROW_POSITION.fieldId()) {
+          readPlan.add(Pair.of(pos, ValueReaders.positions()));
+        } else if (field.isOptional()) {
+          readPlan.add(Pair.of(pos, ValueReaders.constant(null)));
+        } else {
+          throw new IllegalArgumentException(
+              String.format("Missing required field: %s", field.name()));
         }
+      }
 
-        return ValueReaders.record(fields, record);
+      return recordReader(readPlan, avroSchemas.get(partner), 
record.getFullName());
+    }
 
-      } catch (ClassNotFoundException e) {
-        return ValueReaders.record(fields, record);
+    @SuppressWarnings("unchecked")
+    private ValueReader<?> recordReader(
+        List<Pair<Integer, ValueReader<?>>> readPlan, Schema avroSchema, 
String recordName) {
+      String className = renames.getOrDefault(recordName, recordName);
+      if (className != null) {
+        try {
+          Class<?> recordClass = 
DynClasses.builder().loader(loader).impl(className).buildChecked();
+          if (IndexedRecord.class.isAssignableFrom(recordClass)) {
+            return ValueReaders.record(
+                avroSchema, (Class<? extends IndexedRecord>) recordClass, 
readPlan);
+          }
+        } catch (ClassNotFoundException e) {
+          // use a generic record reader below
+        }
       }
+
+      return ValueReaders.record(avroSchema, readPlan);
     }
 
     @Override
-    public ValueReader<?> union(Schema union, List<ValueReader<?>> options) {
+    public ValueReader<?> union(Type partner, Schema union, 
List<ValueReader<?>> options) {
       return ValueReaders.union(options);
     }
 
     @Override
-    public ValueReader<?> array(Schema array, ValueReader<?> elementReader) {
-      if (array.getLogicalType() instanceof LogicalMap) {
-        ValueReaders.StructReader<?> keyValueReader = 
(ValueReaders.StructReader) elementReader;
-        ValueReader<?> keyReader = keyValueReader.reader(0);
-        ValueReader<?> valueReader = keyValueReader.reader(1);
-
-        if (keyReader == ValueReaders.utf8s()) {
-          return ValueReaders.arrayMap(ValueReaders.strings(), valueReader);
-        }
-
-        return ValueReaders.arrayMap(keyReader, valueReader);
+    public ValueReader<?> arrayMap(
+        Type partner, Schema map, ValueReader<?> keyReader, ValueReader<?> 
valueReader) {
+      if (keyReader == ValueReaders.utf8s()) {
+        return ValueReaders.arrayMap(ValueReaders.strings(), valueReader);
       }
 
+      return ValueReaders.arrayMap(keyReader, valueReader);
+    }
+
+    @Override
+    public ValueReader<?> array(Type partner, Schema array, ValueReader<?> 
elementReader) {
       return ValueReaders.array(elementReader);
     }
 
     @Override
-    public ValueReader<?> map(Schema map, ValueReader<?> valueReader) {
+    public ValueReader<?> map(Type partner, Schema map, ValueReader<?> 
valueReader) {
       return ValueReaders.map(ValueReaders.strings(), valueReader);
     }
 
     @Override
-    public ValueReader<?> primitive(Schema primitive) {
+    public ValueReader<?> primitive(Type partner, Schema primitive) {
       LogicalType logicalType = primitive.getLogicalType();
       if (logicalType != null) {
         switch (logicalType.getName()) {
@@ -163,10 +239,16 @@ public class GenericAvroReader<T> implements 
DatumReader<T>, SupportsRowPosition
         case BOOLEAN:
           return ValueReaders.booleans();
         case INT:
+          if (partner != null && partner.typeId() == Type.TypeID.LONG) {
+            return ValueReaders.intsAsLongs();
+          }
           return ValueReaders.ints();
         case LONG:
           return ValueReaders.longs();
         case FLOAT:
+          if (partner != null && partner.typeId() == Type.TypeID.DOUBLE) {
+            return ValueReaders.floatsAsDoubles();
+          }
           return ValueReaders.floats();
         case DOUBLE:
           return ValueReaders.doubles();
@@ -182,5 +264,19 @@ public class GenericAvroReader<T> implements 
DatumReader<T>, SupportsRowPosition
           throw new IllegalArgumentException("Unsupported type: " + primitive);
       }
     }
+
+    private Map<Integer, Integer> idToPos(Types.StructType struct) {
+      Map<Integer, Integer> idToPos = Maps.newHashMap();
+
+      if (struct != null) {
+        List<Types.NestedField> fields = struct.fields();
+        for (int pos = 0; pos < fields.size(); pos += 1) {
+          Types.NestedField field = fields.get(pos);
+          idToPos.put(field.fieldId(), pos);
+        }
+      }
+
+      return idToPos;
+    }
   }
 }
diff --git 
a/core/src/main/java/org/apache/iceberg/avro/NameMappingDatumReader.java 
b/core/src/main/java/org/apache/iceberg/avro/NameMappingDatumReader.java
new file mode 100644
index 0000000000..83f2beabee
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/avro/NameMappingDatumReader.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.avro;
+
+import java.io.IOException;
+import java.util.function.Supplier;
+import org.apache.avro.Schema;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.iceberg.mapping.NameMapping;
+
+/**
+ * A delegating DatumReader that applies a name mapping to a file schema to 
enable reading Avro
+ * files that were written without field IDs.
+ *
+ * @param <D> Java class of datums produced by this reader
+ */
+public class NameMappingDatumReader<D> implements DatumReader<D>, 
SupportsRowPosition {
+  private final NameMapping nameMapping;
+  private final DatumReader<D> wrapped;
+
+  public NameMappingDatumReader(NameMapping nameMapping, DatumReader<D> 
wrapped) {
+    this.nameMapping = nameMapping;
+    this.wrapped = wrapped;
+  }
+
+  @Override
+  public void setSchema(Schema newFileSchema) {
+    Schema fileSchema;
+    if (AvroSchemaUtil.hasIds(newFileSchema)) {
+      fileSchema = newFileSchema;
+    } else {
+      fileSchema = AvroSchemaUtil.applyNameMapping(newFileSchema, nameMapping);
+    }
+
+    wrapped.setSchema(fileSchema);
+  }
+
+  @Override
+  public D read(D reuse, Decoder in) throws IOException {
+    return wrapped.read(reuse, in);
+  }
+
+  @Override
+  public void setRowPositionSupplier(Supplier<Long> posSupplier) {
+    if (wrapped instanceof SupportsRowPosition) {
+      ((SupportsRowPosition) wrapped).setRowPositionSupplier(posSupplier);
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReader.java 
b/core/src/main/java/org/apache/iceberg/avro/SupportsCustomRecords.java
similarity index 79%
copy from core/src/main/java/org/apache/iceberg/avro/ValueReader.java
copy to core/src/main/java/org/apache/iceberg/avro/SupportsCustomRecords.java
index 5470b8168f..c05e206e13 100644
--- a/core/src/main/java/org/apache/iceberg/avro/ValueReader.java
+++ b/core/src/main/java/org/apache/iceberg/avro/SupportsCustomRecords.java
@@ -18,9 +18,11 @@
  */
 package org.apache.iceberg.avro;
 
-import java.io.IOException;
-import org.apache.avro.io.Decoder;
+import java.util.Map;
 
-public interface ValueReader<T> {
-  T read(Decoder decoder, Object reuse) throws IOException;
+/** An interface for Avro DatumReaders to support custom record classes. */
+interface SupportsCustomRecords {
+  void setClassLoader(ClassLoader loader);
+
+  void setRenames(Map<String, String> renames);
 }
diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReader.java 
b/core/src/main/java/org/apache/iceberg/avro/ValueReader.java
index 5470b8168f..d01bd74f53 100644
--- a/core/src/main/java/org/apache/iceberg/avro/ValueReader.java
+++ b/core/src/main/java/org/apache/iceberg/avro/ValueReader.java
@@ -23,4 +23,8 @@ import org.apache.avro.io.Decoder;
 
 public interface ValueReader<T> {
   T read(Decoder decoder, Object reuse) throws IOException;
+
+  default void skip(Decoder decoder) throws IOException {
+    read(decoder, null);
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java 
b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
index 19789cce82..d530bc1854 100644
--- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
+++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
@@ -31,6 +31,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.UUID;
 import java.util.function.Supplier;
 import org.apache.avro.Schema;
@@ -44,6 +45,7 @@ import org.apache.iceberg.common.DynConstructors;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
 import org.apache.iceberg.util.UUIDUtil;
 
 public class ValueReaders {
@@ -53,6 +55,14 @@ public class ValueReaders {
     return NullReader.INSTANCE;
   }
 
+  public static <T> ValueReader<T> constant(T value) {
+    return new ConstantReader<>(value);
+  }
+
+  public static <T> ValueReader<T> replaceWithConstant(ValueReader<?> reader, 
T value) {
+    return new ReplaceWithConstantReader<>(reader, value);
+  }
+
   public static ValueReader<Boolean> booleans() {
     return BooleanReader.INSTANCE;
   }
@@ -61,6 +71,10 @@ public class ValueReaders {
     return IntegerReader.INSTANCE;
   }
 
+  public static ValueReader<Long> intsAsLongs() {
+    return IntegerAsLongReader.INSTANCE;
+  }
+
   public static ValueReader<Long> longs() {
     return LongReader.INSTANCE;
   }
@@ -69,6 +83,10 @@ public class ValueReaders {
     return FloatReader.INSTANCE;
   }
 
+  public static ValueReader<Double> floatsAsDoubles() {
+    return FloatAsDoubleReader.INSTANCE;
+  }
+
   public static ValueReader<Double> doubles() {
     return DoubleReader.INSTANCE;
   }
@@ -125,6 +143,10 @@ public class ValueReaders {
     return new UnionReader(readers);
   }
 
+  public static ValueReader<Long> positions() {
+    return new PositionReader();
+  }
+
   public static <T> ValueReader<Collection<T>> array(ValueReader<T> 
elementReader) {
     return new ArrayReader<>(elementReader);
   }
@@ -149,6 +171,16 @@ public class ValueReaders {
     return new IndexedRecordReader<>(readers, recordClass, recordSchema);
   }
 
+  public static ValueReader<?> record(
+      Schema recordSchema, List<Pair<Integer, ValueReader<?>>> readPlan) {
+    return new PlannedRecordReader(recordSchema, readPlan);
+  }
+
+  public static <R extends IndexedRecord> ValueReader<R> record(
+      Schema recordSchema, Class<R> recordClass, List<Pair<Integer, 
ValueReader<?>>> readPlan) {
+    return new PlannedIndexedReader<>(recordSchema, recordClass, readPlan);
+  }
+
   private static class NullReader implements ValueReader<Object> {
     private static final NullReader INSTANCE = new NullReader();
 
@@ -159,6 +191,47 @@ public class ValueReaders {
       decoder.readNull();
       return null;
     }
+
+    @Override
+    public void skip(Decoder decoder) throws IOException {
+      decoder.readNull();
+    }
+  }
+
+  private static class ConstantReader<T> implements ValueReader<T> {
+    private final T constant;
+
+    private ConstantReader(T constant) {
+      this.constant = constant;
+    }
+
+    @Override
+    public T read(Decoder decoder, Object reuse) throws IOException {
+      return constant;
+    }
+
+    @Override
+    public void skip(Decoder decoder) throws IOException {}
+  }
+
+  private static class ReplaceWithConstantReader<T> extends ConstantReader<T> {
+    private final ValueReader<?> replaced;
+
+    private ReplaceWithConstantReader(ValueReader<?> replaced, T constant) {
+      super(constant);
+      this.replaced = replaced;
+    }
+
+    @Override
+    public T read(Decoder decoder, Object reuse) throws IOException {
+      replaced.read(decoder, reuse);
+      return super.read(decoder, reuse);
+    }
+
+    @Override
+    public void skip(Decoder decoder) throws IOException {
+      replaced.skip(decoder);
+    }
   }
 
   private static class BooleanReader implements ValueReader<Boolean> {
@@ -170,6 +243,11 @@ public class ValueReaders {
     public Boolean read(Decoder decoder, Object ignored) throws IOException {
       return decoder.readBoolean();
     }
+
+    @Override
+    public void skip(Decoder decoder) throws IOException {
+      decoder.readBoolean();
+    }
   }
 
   private static class IntegerReader implements ValueReader<Integer> {
@@ -181,6 +259,27 @@ public class ValueReaders {
     public Integer read(Decoder decoder, Object ignored) throws IOException {
       return decoder.readInt();
     }
+
+    @Override
+    public void skip(Decoder decoder) throws IOException {
+      decoder.readInt();
+    }
+  }
+
+  private static class IntegerAsLongReader implements ValueReader<Long> {
+    private static final IntegerAsLongReader INSTANCE = new 
IntegerAsLongReader();
+
+    private IntegerAsLongReader() {}
+
+    @Override
+    public Long read(Decoder decoder, Object ignored) throws IOException {
+      return (long) decoder.readInt();
+    }
+
+    @Override
+    public void skip(Decoder decoder) throws IOException {
+      decoder.readInt();
+    }
   }
 
   private static class LongReader implements ValueReader<Long> {
@@ -192,6 +291,11 @@ public class ValueReaders {
     public Long read(Decoder decoder, Object ignored) throws IOException {
       return decoder.readLong();
     }
+
+    @Override
+    public void skip(Decoder decoder) throws IOException {
+      decoder.readLong();
+    }
   }
 
   private static class FloatReader implements ValueReader<Float> {
@@ -203,6 +307,27 @@ public class ValueReaders {
     public Float read(Decoder decoder, Object ignored) throws IOException {
       return decoder.readFloat();
     }
+
+    @Override
+    public void skip(Decoder decoder) throws IOException {
+      decoder.skipFixed(4);
+    }
+  }
+
+  private static class FloatAsDoubleReader implements ValueReader<Double> {
+    private static final FloatAsDoubleReader INSTANCE = new 
FloatAsDoubleReader();
+
+    private FloatAsDoubleReader() {}
+
+    @Override
+    public Double read(Decoder decoder, Object ignored) throws IOException {
+      return (double) decoder.readFloat();
+    }
+
+    @Override
+    public void skip(Decoder decoder) throws IOException {
+      decoder.skipFixed(4);
+    }
   }
 
   private static class DoubleReader implements ValueReader<Double> {
@@ -214,6 +339,11 @@ public class ValueReaders {
     public Double read(Decoder decoder, Object ignored) throws IOException {
       return decoder.readDouble();
     }
+
+    @Override
+    public void skip(Decoder decoder) throws IOException {
+      decoder.skipFixed(8);
+    }
   }
 
   private static class StringReader implements ValueReader<String> {
@@ -231,6 +361,11 @@ public class ValueReaders {
       //      byte[] bytes = new byte[length];
       //      decoder.readFixed(bytes, 0, length);
     }
+
+    @Override
+    public void skip(Decoder decoder) throws IOException {
+      decoder.skipString();
+    }
   }
 
   private static class Utf8Reader implements ValueReader<Utf8> {
@@ -250,6 +385,11 @@ public class ValueReaders {
       //      byte[] bytes = new byte[length];
       //      decoder.readFixed(bytes, 0, length);
     }
+
+    @Override
+    public void skip(Decoder decoder) throws IOException {
+      decoder.skipString();
+    }
   }
 
   private static class UUIDReader implements ValueReader<UUID> {
@@ -275,6 +415,11 @@ public class ValueReaders {
 
       return UUIDUtil.convert(buffer);
     }
+
+    @Override
+    public void skip(Decoder decoder) throws IOException {
+      decoder.skipFixed(16);
+    }
   }
 
   private static class FixedReader implements ValueReader<byte[]> {
@@ -298,6 +443,11 @@ public class ValueReaders {
       decoder.readFixed(bytes, 0, length);
       return bytes;
     }
+
+    @Override
+    public void skip(Decoder decoder) throws IOException {
+      decoder.skipFixed(length);
+    }
   }
 
   private static class GenericFixedReader implements 
ValueReader<GenericData.Fixed> {
@@ -323,6 +473,11 @@ public class ValueReaders {
       decoder.readFixed(bytes, 0, length);
       return new GenericData.Fixed(schema, bytes);
     }
+
+    @Override
+    public void skip(Decoder decoder) throws IOException {
+      decoder.skipFixed(length);
+    }
   }
 
   private static class BytesReader implements ValueReader<byte[]> {
@@ -344,6 +499,11 @@ public class ValueReaders {
       //      decoder.readFixed(bytes, 0, length);
       //      return bytes;
     }
+
+    @Override
+    public void skip(Decoder decoder) throws IOException {
+      decoder.skipBytes();
+    }
   }
 
   private static class ByteBufferReader implements ValueReader<ByteBuffer> {
@@ -364,6 +524,11 @@ public class ValueReaders {
       //      decoder.readFixed(bytes, 0, length);
       //      return bytes;
     }
+
+    @Override
+    public void skip(Decoder decoder) throws IOException {
+      decoder.skipBytes();
+    }
   }
 
   private static class DecimalReader implements ValueReader<BigDecimal> {
@@ -381,6 +546,11 @@ public class ValueReaders {
       byte[] bytes = bytesReader.read(decoder, null);
       return new BigDecimal(new BigInteger(bytes), scale);
     }
+
+    @Override
+    public void skip(Decoder decoder) throws IOException {
+      bytesReader.skip(decoder);
+    }
   }
 
   private static class UnionReader implements ValueReader<Object> {
@@ -398,6 +568,12 @@ public class ValueReaders {
       int index = decoder.readIndex();
       return readers[index].read(decoder, reuse);
     }
+
+    @Override
+    public void skip(Decoder decoder) throws IOException {
+      int index = decoder.readIndex();
+      readers[index].skip(decoder);
+    }
   }
 
   private static class EnumReader implements ValueReader<String> {
@@ -415,6 +591,11 @@ public class ValueReaders {
       int index = decoder.readEnum();
       return symbols[index];
     }
+
+    @Override
+    public void skip(Decoder decoder) throws IOException {
+      decoder.readEnum();
+    }
   }
 
   private static class ArrayReader<T> implements ValueReader<Collection<T>> {
@@ -456,6 +637,16 @@ public class ValueReaders {
 
       return resultList;
     }
+
+    @Override
+    public void skip(Decoder decoder) throws IOException {
+      long itemsToSkip;
+      while ((itemsToSkip = decoder.skipArray()) != 0) {
+        for (int i = 0; i < itemsToSkip; i += 1) {
+          elementReader.skip(decoder);
+        }
+      }
+    }
   }
 
   private static class ArrayMapReader<K, V> implements ValueReader<Map<K, V>> {
@@ -509,6 +700,17 @@ public class ValueReaders {
 
       return resultMap;
     }
+
+    @Override
+    public void skip(Decoder decoder) throws IOException {
+      long itemsToSkip;
+      while ((itemsToSkip = decoder.skipArray()) != 0) {
+        for (int i = 0; i < itemsToSkip; i += 1) {
+          keyReader.skip(decoder);
+          valueReader.skip(decoder);
+        }
+      }
+    }
   }
 
   private static class MapReader<K, V> implements ValueReader<Map<K, V>> {
@@ -562,6 +764,133 @@ public class ValueReaders {
 
       return resultMap;
     }
+
+    @Override
+    public void skip(Decoder decoder) throws IOException {
+      long itemsToSkip;
+      while ((itemsToSkip = decoder.skipMap()) != 0) {
+        for (int i = 0; i < itemsToSkip; i += 1) {
+          keyReader.skip(decoder);
+          valueReader.skip(decoder);
+        }
+      }
+    }
+  }
+
+  public abstract static class PlannedStructReader<S>
+      implements ValueReader<S>, SupportsRowPosition {
+    private final ValueReader<?>[] readers;
+    private final Integer[] positions;
+
+    protected PlannedStructReader(List<Pair<Integer, ValueReader<?>>> 
readPlan) {
+      this.readers = 
readPlan.stream().map(Pair::second).toArray(ValueReader[]::new);
+      this.positions = 
readPlan.stream().map(Pair::first).toArray(Integer[]::new);
+    }
+
+    @Override
+    public void setRowPositionSupplier(Supplier<Long> posSupplier) {
+      for (ValueReader<?> reader : readers) {
+        if (reader instanceof SupportsRowPosition) {
+          ((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier);
+        }
+      }
+    }
+
+    protected abstract S reuseOrCreate(Object reuse);
+
+    protected abstract Object get(S struct, int pos);
+
+    protected abstract void set(S struct, int pos, Object value);
+
+    @Override
+    public S read(Decoder decoder, Object reuse) throws IOException {
+      S struct = reuseOrCreate(reuse);
+
+      for (int i = 0; i < readers.length; i += 1) {
+        if (positions[i] != null) {
+          Object reusedValue = get(struct, positions[i]);
+          set(struct, positions[i], readers[i].read(decoder, reusedValue));
+        } else {
+          // if pos is null, the value is not projected
+          readers[i].skip(decoder);
+        }
+      }
+
+      return struct;
+    }
+
+    @Override
+    public void skip(Decoder decoder) throws IOException {
+      for (int i = 0; i < readers.length; i += 1) {
+        readers[i].skip(decoder);
+      }
+    }
+  }
+
+  private static class PlannedRecordReader extends 
PlannedStructReader<GenericData.Record> {
+    private final Schema recordSchema;
+
+    private PlannedRecordReader(Schema recordSchema, List<Pair<Integer, 
ValueReader<?>>> readPlan) {
+      super(readPlan);
+      this.recordSchema = recordSchema;
+    }
+
+    @Override
+    protected GenericData.Record reuseOrCreate(Object reuse) {
+      if (reuse instanceof GenericData.Record) {
+        return (GenericData.Record) reuse;
+      } else {
+        return new GenericData.Record(recordSchema);
+      }
+    }
+
+    @Override
+    protected Object get(GenericData.Record struct, int pos) {
+      return struct.get(pos);
+    }
+
+    @Override
+    protected void set(GenericData.Record struct, int pos, Object value) {
+      struct.put(pos, value);
+    }
+  }
+
+  private static class PlannedIndexedReader<R extends IndexedRecord>
+      extends PlannedStructReader<R> {
+    private final Class<R> recordClass;
+    private final DynConstructors.Ctor<R> ctor;
+    private final Schema schema;
+
+    PlannedIndexedReader(
+        Schema recordSchema, Class<R> recordClass, List<Pair<Integer, 
ValueReader<?>>> readPlan) {
+      super(readPlan);
+      this.recordClass = recordClass;
+      this.ctor =
+          DynConstructors.builder(IndexedRecord.class)
+              .hiddenImpl(recordClass, Schema.class)
+              .hiddenImpl(recordClass)
+              .build();
+      this.schema = recordSchema;
+    }
+
+    @Override
+    protected R reuseOrCreate(Object reuse) {
+      if (recordClass.isInstance(reuse)) {
+        return recordClass.cast(reuse);
+      } else {
+        return ctor.newInstance(schema);
+      }
+    }
+
+    @Override
+    protected Object get(R struct, int pos) {
+      return struct.get(pos);
+    }
+
+    @Override
+    protected void set(R struct, int pos, Object value) {
+      struct.put(pos, value);
+    }
   }
 
   public abstract static class StructReader<S> implements ValueReader<S>, 
SupportsRowPosition {
@@ -577,10 +906,11 @@ public class ValueReaders {
       List<Schema.Field> fields = schema.getFields();
       for (int pos = 0; pos < fields.size(); pos += 1) {
         Schema.Field field = fields.get(pos);
-        if (AvroSchemaUtil.getFieldId(field) == 
MetadataColumns.ROW_POSITION.fieldId()) {
+        if (Objects.equals(AvroSchemaUtil.fieldId(field), 
MetadataColumns.ROW_POSITION.fieldId())) {
           // track where the _pos field is located for setRowPositionSupplier
           this.posField = pos;
-        } else if (AvroSchemaUtil.getFieldId(field) == 
MetadataColumns.IS_DELETED.fieldId()) {
+        } else if (Objects.equals(
+            AvroSchemaUtil.fieldId(field), 
MetadataColumns.IS_DELETED.fieldId())) {
           isDeletedColumnPos = pos;
         }
       }
@@ -622,19 +952,12 @@ public class ValueReaders {
     @Override
     public void setRowPositionSupplier(Supplier<Long> posSupplier) {
       if (posField >= 0) {
-        long startingPos = posSupplier.get();
-        this.readers[posField] = new PositionReader(startingPos);
-        for (ValueReader<?> reader : readers) {
-          if (reader instanceof SupportsRowPosition) {
-            ((SupportsRowPosition) reader).setRowPositionSupplier(() -> 
startingPos);
-          }
-        }
+        this.readers[posField] = new PositionReader();
+      }
 
-      } else {
-        for (ValueReader<?> reader : readers) {
-          if (reader instanceof SupportsRowPosition) {
-            ((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier);
-          }
+      for (ValueReader<?> reader : readers) {
+        if (reader instanceof SupportsRowPosition) {
+          ((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier);
         }
       }
     }
@@ -739,17 +1062,21 @@ public class ValueReaders {
     }
   }
 
-  static class PositionReader implements ValueReader<Long> {
-    private long currentPosition;
-
-    PositionReader(long rowPosition) {
-      this.currentPosition = rowPosition - 1;
-    }
+  static class PositionReader implements ValueReader<Long>, 
SupportsRowPosition {
+    private long currentPosition = 0;
 
     @Override
     public Long read(Decoder ignored, Object reuse) throws IOException {
-      currentPosition += 1;
+      this.currentPosition += 1;
       return currentPosition;
     }
+
+    @Override
+    public void skip(Decoder ignored) throws IOException {}
+
+    @Override
+    public void setRowPositionSupplier(Supplier<Long> posSupplier) {
+      this.currentPosition = posSupplier.get() - 1;
+    }
   }
 }
diff --git 
a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java 
b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java
index f4c7ee883e..2af098445c 100644
--- a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java
+++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java
@@ -80,10 +80,7 @@ public class TestAvroNameMapping extends 
TestAvroReadProjection {
 
     Record projected = writeAndRead(writeSchema, readSchema, record, 
nameMapping);
     // field id 5 comes from read schema
-    Assertions.assertThat(projected.getSchema().getField("location_r5"))
-        .as("Field missing from table mapping is renamed")
-        .isNotNull();
-    Assertions.assertThat(projected.get("location_r5"))
+    Assertions.assertThat(projected.get("location"))
         .as("location field should not be read")
         .isNull();
     Assertions.assertThat(projected.get("id")).isEqualTo(34L);
@@ -105,10 +102,7 @@ public class TestAvroNameMapping extends 
TestAvroReadProjection {
 
     projected = writeAndRead(writeSchema, readSchema, record, nameMapping);
     Record projectedL1 = ((Map<String, Record>) 
projected.get("location")).get("l1");
-    Assertions.assertThat(projectedL1.getSchema().getField("long_r2"))
-        .as("Field missing from table mapping is renamed")
-        .isNotNull();
-    Assertions.assertThat(projectedL1.get("long_r2"))
+    Assertions.assertThat(projectedL1.get("long"))
         .as("location.value.long, should not be read")
         .isNull();
   }
@@ -187,8 +181,7 @@ public class TestAvroNameMapping extends 
TestAvroReadProjection {
             Comparators.charSequences().compare("k2", (CharSequence) 
projectedKey.get("k2")))
         .isEqualTo(0);
     Assertions.assertThat(projectedValue.get("lat")).isEqualTo(52.995143f);
-    
Assertions.assertThat(projectedValue.getSchema().getField("long_r2")).isNotNull();
-    Assertions.assertThat(projectedValue.get("long_r2")).isNull();
+    Assertions.assertThat(projectedValue.get("long")).isNull();
   }
 
   @Test
@@ -249,10 +242,7 @@ public class TestAvroNameMapping extends 
TestAvroReadProjection {
     Schema readSchema = writeSchema;
 
     Record projected = writeAndRead(writeSchema, readSchema, record, 
nameMapping);
-    Assertions.assertThat(projected.getSchema().getField("point_r22"))
-        .as("Field missing from table mapping is renamed")
-        .isNotNull();
-    Assertions.assertThat(projected.get("point_r22")).as("point field is not 
projected").isNull();
+    Assertions.assertThat(projected.get("point")).as("point is not 
projected").isNull();
     Assertions.assertThat(projected.get("id")).isEqualTo(34L);
     // point array is partially projected
     nameMapping =
@@ -269,11 +259,8 @@ public class TestAvroNameMapping extends 
TestAvroReadProjection {
 
     projected = writeAndRead(writeSchema, readSchema, record, nameMapping);
     Record point = ((List<Record>) projected.get("point")).get(0);
-    Assertions.assertThat(point.getSchema().getField("y_r18"))
-        .as("Field missing from table mapping is renamed")
-        .isNotNull();
     Assertions.assertThat(point.get("x")).as("point.x is 
projected").isEqualTo(1);
-    Assertions.assertThat(point.get("y_r18")).as("point.y is not 
projected").isNull();
+    Assertions.assertThat(point.get("y")).as("point.y is not 
projected").isNull();
     Assertions.assertThat(projected.get("id")).isEqualTo(34L);
   }
 

Reply via email to