This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a3cd22  Parquet: Support name mappings to recover field IDs (#830)
5a3cd22 is described below

commit 5a3cd22e775dfa8bf79deab675390aad48ba79a5
Author: Chen, Junjie <[email protected]>
AuthorDate: Thu Jun 18 00:57:08 2020 +0800

    Parquet: Support name mappings to recover field IDs (#830)
---
 .../main/java/org/apache/iceberg/avro/Avro.java    |   2 +-
 .../apache/iceberg/avro/TestAvroNameMapping.java   |   2 +-
 .../apache/iceberg/parquet/ApplyNameMapping.java   | 110 +++++++++++++++++++++
 .../java/org/apache/iceberg/parquet/Parquet.java   |  25 ++++-
 .../apache/iceberg/parquet/ParquetReadSupport.java |  17 +++-
 .../org/apache/iceberg/parquet/ParquetReader.java  |   8 +-
 .../apache/iceberg/parquet/ParquetSchemaUtil.java  |  58 ++++++++---
 .../org/apache/iceberg/parquet/PruneColumns.java   |  33 ++++---
 .../java/org/apache/iceberg/parquet/ReadConf.java  |  23 +++--
 .../iceberg/parquet/VectorizedParquetReader.java   |  10 +-
 .../iceberg/parquet/TestParquetSchemaUtil.java     |  95 ++++++++++++++++++
 .../iceberg/spark/data/SparkParquetReaders.java    |   8 +-
 .../vectorized/VectorizedSparkParquetReaders.java  |   4 +
 .../iceberg/spark/source/BatchDataReader.java      |  16 ++-
 .../org/apache/iceberg/spark/source/Reader.java    |  35 ++++---
 .../apache/iceberg/spark/source/RowDataReader.java |  18 +++-
 .../iceberg/spark/source/RowDataRewriter.java      |   7 +-
 .../iceberg/spark/source/TestSparkTableUtil.java   |  98 ++++++++++++++++++
 18 files changed, 487 insertions(+), 82 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java 
b/core/src/main/java/org/apache/iceberg/avro/Avro.java
index 689a1a8..24716e3 100644
--- a/core/src/main/java/org/apache/iceberg/avro/Avro.java
+++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java
@@ -235,7 +235,7 @@ public class Avro {
       return this;
     }
 
-    public ReadBuilder nameMapping(NameMapping newNameMapping) {
+    public ReadBuilder withNameMapping(NameMapping newNameMapping) {
       this.nameMapping = newNameMapping;
       return this;
     }
diff --git 
a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java 
b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java
index 02025a7..3a3c28c 100644
--- a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java
+++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java
@@ -315,7 +315,7 @@ public class TestAvroNameMapping extends 
TestAvroReadProjection {
 
     Iterable<GenericData.Record> records = Avro.read(Files.localInput(file))
         .project(readSchema)
-        .nameMapping(nameMapping)
+        .withNameMapping(nameMapping)
         .build();
 
     return Iterables.getOnlyElement(records);
diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/ApplyNameMapping.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/ApplyNameMapping.java
new file mode 100644
index 0000000..85d6122
--- /dev/null
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ApplyNameMapping.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.apache.iceberg.mapping.MappedField;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
+
+class ApplyNameMapping extends ParquetTypeVisitor<Type> {
+  private final NameMapping nameMapping;
+
+  ApplyNameMapping(NameMapping nameMapping) {
+    this.nameMapping = nameMapping;
+  }
+
+  @Override
+  public Type message(MessageType message, List<Type> fields) {
+    Types.MessageTypeBuilder builder = 
org.apache.parquet.schema.Types.buildMessage();
+    fields.stream().filter(Objects::nonNull).forEach(builder::addField);
+
+    return builder.named(message.getName());
+  }
+
+  @Override
+  public Type struct(GroupType struct, List<Type> types) {
+    MappedField field = nameMapping.find(currentPath());
+    List<Type> actualTypes = 
types.stream().filter(Objects::nonNull).collect(Collectors.toList());
+    Type structType = struct.withNewFields(actualTypes);
+
+    return field == null ? structType : structType.withId(field.id());
+  }
+
+  @Override
+  public Type list(GroupType list, Type elementType) {
+    Preconditions.checkArgument(elementType != null,
+        "List type must have element field");
+
+    MappedField field = nameMapping.find(currentPath());
+    Type listType = org.apache.parquet.schema.Types.list(list.getRepetition())
+        .element(elementType)
+        .named(list.getName());
+
+    return field == null ? listType : listType.withId(field.id());
+  }
+
+  @Override
+  public Type map(GroupType map, Type keyType, Type valueType) {
+    Preconditions.checkArgument(keyType != null && valueType != null,
+        "Map type must have both key field and value field");
+
+    MappedField field = nameMapping.find(currentPath());
+    Type mapType = org.apache.parquet.schema.Types.map(map.getRepetition())
+        .key(keyType)
+        .value(valueType)
+        .named(map.getName());
+
+    return field == null ? mapType : mapType.withId(field.id());
+  }
+
+  @Override
+  public Type primitive(PrimitiveType primitive) {
+    MappedField field = nameMapping.find(currentPath());
+    return field == null ? primitive : primitive.withId(field.id());
+  }
+
+  @Override
+  public void beforeRepeatedElement(Type element) {
+    // do not add the repeated element's name
+  }
+
+  @Override
+  public void afterRepeatedElement(Type element) {
+    // do not remove the repeated element's name
+  }
+
+  @Override
+  public void beforeRepeatedKeyValue(Type keyValue) {
+    // do not add the repeated element's name
+  }
+
+  @Override
+  public void afterRepeatedKeyValue(Type keyValue) {
+    // do not remove the repeated element's name
+  }
+}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index df23785..fa6c80d 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -38,6 +38,7 @@ import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.mapping.NameMapping;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -312,6 +313,7 @@ public class Parquet {
     private boolean callInit = false;
     private boolean reuseContainers = false;
     private int maxRecordsPerBatch = 10000;
+    private NameMapping nameMapping = null;
 
     private ReadBuilder(InputFile file) {
       this.file = file;
@@ -393,6 +395,11 @@ public class Parquet {
       return this;
     }
 
+    public ReadBuilder withNameMapping(NameMapping newNameMapping) {
+      this.nameMapping = newNameMapping;
+      return this;
+    }
+
     @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity"})
     public <D> CloseableIterable<D> build() {
       if (readerFunc != null || batchedReaderFunc != null) {
@@ -419,11 +426,11 @@ public class Parquet {
         ParquetReadOptions options = optionsBuilder.build();
 
         if (batchedReaderFunc != null) {
-          return new VectorizedParquetReader(file, schema, options, 
batchedReaderFunc, filter, reuseContainers,
-              caseSensitive, maxRecordsPerBatch);
+          return new VectorizedParquetReader(file, schema, options, 
batchedReaderFunc, nameMapping, filter,
+              reuseContainers, caseSensitive, maxRecordsPerBatch);
         } else {
           return new org.apache.iceberg.parquet.ParquetReader<>(
-              file, schema, options, readerFunc, filter, reuseContainers, 
caseSensitive);
+              file, schema, options, readerFunc, nameMapping, filter, 
reuseContainers, caseSensitive);
         }
       }
 
@@ -475,6 +482,10 @@ public class Parquet {
         builder.withFileRange(start, start + length);
       }
 
+      if (nameMapping != null) {
+        builder.withNameMapping(nameMapping);
+      }
+
       return new ParquetIterable<>(builder);
     }
   }
@@ -483,6 +494,7 @@ public class Parquet {
     private Schema schema = null;
     private ReadSupport<T> readSupport = null;
     private boolean callInit = false;
+    private NameMapping nameMapping = null;
 
     private ParquetReadBuilder(org.apache.parquet.io.InputFile file) {
       super(file);
@@ -493,6 +505,11 @@ public class Parquet {
       return this;
     }
 
+    public ParquetReadBuilder<T> withNameMapping(NameMapping newNameMapping) {
+      this.nameMapping = newNameMapping;
+      return this;
+    }
+
     public ParquetReadBuilder<T> readSupport(ReadSupport<T> newReadSupport) {
       this.readSupport = newReadSupport;
       return this;
@@ -505,7 +522,7 @@ public class Parquet {
 
     @Override
     protected ReadSupport<T> getReadSupport() {
-      return new ParquetReadSupport<>(schema, readSupport, callInit);
+      return new ParquetReadSupport<>(schema, readSupport, callInit, 
nameMapping);
     }
   }
 }
diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java
index bedc91a..645b3fa 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.mapping.NameMapping;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.parquet.avro.AvroReadSupport;
@@ -41,11 +42,13 @@ class ParquetReadSupport<T> extends ReadSupport<T> {
   private final Schema expectedSchema;
   private final ReadSupport<T> wrapped;
   private final boolean callInit;
+  private final NameMapping nameMapping;
 
-  ParquetReadSupport(Schema expectedSchema, ReadSupport<T> readSupport, 
boolean callInit) {
+  ParquetReadSupport(Schema expectedSchema, ReadSupport<T> readSupport, 
boolean callInit, NameMapping nameMapping) {
     this.expectedSchema = expectedSchema;
     this.wrapped = readSupport;
     this.callInit = callInit;
+    this.nameMapping = nameMapping;
   }
 
   @Override
@@ -55,9 +58,15 @@ class ParquetReadSupport<T> extends ReadSupport<T> {
     // matching to the file's columns by full path, so this must select 
columns by using the path
     // in the file's schema.
 
-    MessageType projection = ParquetSchemaUtil.hasIds(fileSchema) ?
-        ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema) :
-        ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema);
+    MessageType projection;
+    if (ParquetSchemaUtil.hasIds(fileSchema)) {
+      projection = ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema);
+    } else if (nameMapping != null) {
+      MessageType typeWithIds = ParquetSchemaUtil.applyNameMapping(fileSchema, 
nameMapping);
+      projection = ParquetSchemaUtil.pruneColumns(typeWithIds, expectedSchema);
+    } else {
+      projection = ParquetSchemaUtil.pruneColumnsFallback(fileSchema, 
expectedSchema);
+    }
 
     // override some known backward-compatibility options
     configuration.set("parquet.strict.typing", "false");
diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
index 17d96c1..d61e4f4 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.io.CloseableGroup;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMapping;
 import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.hadoop.ParquetFileReader;
@@ -42,9 +43,10 @@ public class ParquetReader<T> extends CloseableGroup 
implements CloseableIterabl
   private final Expression filter;
   private final boolean reuseContainers;
   private final boolean caseSensitive;
+  private final NameMapping nameMapping;
 
   public ParquetReader(InputFile input, Schema expectedSchema, 
ParquetReadOptions options,
-                       Function<MessageType, ParquetValueReader<?>> readerFunc,
+                       Function<MessageType, ParquetValueReader<?>> 
readerFunc, NameMapping nameMapping,
                        Expression filter, boolean reuseContainers, boolean 
caseSensitive) {
     this.input = input;
     this.expectedSchema = expectedSchema;
@@ -54,6 +56,7 @@ public class ParquetReader<T> extends CloseableGroup 
implements CloseableIterabl
     this.filter = filter == Expressions.alwaysTrue() ? null : filter;
     this.reuseContainers = reuseContainers;
     this.caseSensitive = caseSensitive;
+    this.nameMapping = nameMapping;
   }
 
   private ReadConf<T> conf = null;
@@ -61,7 +64,8 @@ public class ParquetReader<T> extends CloseableGroup 
implements CloseableIterabl
   private ReadConf<T> init() {
     if (conf == null) {
       ReadConf<T> readConf = new ReadConf<>(
-          input, options, expectedSchema, filter, readerFunc, null, 
reuseContainers, caseSensitive, null);
+          input, options, expectedSchema, filter, readerFunc, null, 
nameMapping, reuseContainers,
+          caseSensitive, null);
       this.conf = readConf.copy();
       return readConf;
     }
diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java
index 86dcf6b..2460096 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java
@@ -19,12 +19,16 @@
 
 package org.apache.iceberg.parquet;
 
+import java.util.List;
 import java.util.Set;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.mapping.NameMapping;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
+import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
 import org.apache.parquet.schema.Types.MessageTypeBuilder;
 
@@ -83,22 +87,7 @@ public class ParquetSchemaUtil {
   }
 
   public static boolean hasIds(MessageType fileSchema) {
-    try {
-      // Try to convert the type to Iceberg. If an ID assignment is needed, 
return false.
-      ParquetTypeVisitor.visit(fileSchema, new MessageTypeToType(fileSchema) {
-        @Override
-        protected int nextId() {
-          throw new IllegalStateException("Needed to assign ID");
-        }
-      });
-
-      // no assignment was needed
-      return true;
-
-    } catch (IllegalStateException e) {
-      // at least one field was missing an id.
-      return false;
-    }
+    return ParquetTypeVisitor.visit(fileSchema, new HasIds());
   }
 
   public static MessageType addFallbackIds(MessageType fileSchema) {
@@ -112,4 +101,41 @@ public class ParquetSchemaUtil {
 
     return builder.named(fileSchema.getName());
   }
+
+  public static MessageType applyNameMapping(MessageType fileSchema, 
NameMapping nameMapping) {
+    return (MessageType) ParquetTypeVisitor.visit(fileSchema, new 
ApplyNameMapping(nameMapping));
+  }
+
+  public static class HasIds extends ParquetTypeVisitor<Boolean> {
+    @Override
+    public Boolean message(MessageType message, List<Boolean> fields) {
+      return struct(message, fields);
+    }
+
+    @Override
+    public Boolean struct(GroupType struct, List<Boolean> hasIds) {
+      for (Boolean hasId : hasIds) {
+        if (hasId) {
+          return true;
+        }
+      }
+      return struct.getId() != null;
+    }
+
+    @Override
+    public Boolean list(GroupType array, Boolean hasId) {
+      return hasId || array.getId() != null;
+    }
+
+    @Override
+    public Boolean map(GroupType map, Boolean keyHasId, Boolean valueHasId) {
+      return keyHasId || valueHasId || map.getId() != null;
+    }
+
+    @Override
+    public Boolean primitive(PrimitiveType primitive) {
+      return primitive.getId() != null;
+    }
+  }
+
 }
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java
index bd5db40..d05db46 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java
@@ -21,7 +21,6 @@ package org.apache.iceberg.parquet;
 
 import java.util.List;
 import java.util.Set;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
@@ -45,7 +44,8 @@ class PruneColumns extends ParquetTypeVisitor<Type> {
     for (int i = 0; i < fields.size(); i += 1) {
       Type originalField = message.getType(i);
       Type field = fields.get(i);
-      if (selectedIds.contains(getId(originalField))) {
+      Integer fieldId = getId(originalField);
+      if (fieldId != null && selectedIds.contains(fieldId)) {
         builder.addField(originalField);
         fieldCount += 1;
       } else if (field != null) {
@@ -71,7 +71,8 @@ class PruneColumns extends ParquetTypeVisitor<Type> {
     for (int i = 0; i < fields.size(); i += 1) {
       Type originalField = struct.getType(i);
       Type field = fields.get(i);
-      if (selectedIds.contains(getId(originalField))) {
+      Integer fieldId = getId(originalField);
+      if (fieldId != null && selectedIds.contains(fieldId)) {
         filteredFields.add(originalField);
       } else if (field != null) {
         filteredFields.add(originalField);
@@ -94,17 +95,18 @@ class PruneColumns extends ParquetTypeVisitor<Type> {
   public Type list(GroupType list, Type element) {
     GroupType repeated = list.getType(0).asGroupType();
     Type originalElement = repeated.getType(0);
-    int elementId = getId(originalElement);
+    Integer elementId = getId(originalElement);
 
-    if (selectedIds.contains(elementId)) {
+    if (elementId != null && selectedIds.contains(elementId)) {
       return list;
     } else if (element != null) {
       if (element != originalElement) {
+        Integer listId = getId(list);
         // the element type was projected
-        return Types.list(list.getRepetition())
+        Type listType = Types.list(list.getRepetition())
             .element(element)
-            .id(getId(list))
             .named(list.getName());
+        return listId == null ? listType : listType.withId(listId);
       }
       return list;
     }
@@ -118,18 +120,20 @@ class PruneColumns extends ParquetTypeVisitor<Type> {
     Type originalKey = repeated.getType(0);
     Type originalValue = repeated.getType(1);
 
-    int keyId = getId(originalKey);
-    int valueId = getId(originalValue);
+    Integer keyId = getId(originalKey);
+    Integer valueId = getId(originalValue);
 
-    if (selectedIds.contains(keyId) || selectedIds.contains(valueId)) {
+    if ((keyId != null && selectedIds.contains(keyId)) || (valueId != null && 
selectedIds.contains(valueId))) {
       return map;
     } else if (value != null) {
+      Integer mapId = getId(map);
       if (value != originalValue) {
-        return Types.map(map.getRepetition())
+        Type mapType =  Types.map(map.getRepetition())
             .key(originalKey)
             .value(value)
-            .id(getId(map))
             .named(map.getName());
+
+        return mapId == null ? mapType : mapType.withId(mapId);
       }
       return map;
     }
@@ -142,8 +146,7 @@ class PruneColumns extends ParquetTypeVisitor<Type> {
     return null;
   }
 
-  private int getId(Type type) {
-    Preconditions.checkNotNull(type.getId(), "Missing id for type: %s", type);
-    return type.getId().intValue();
+  private Integer getId(Type type) {
+    return type.getId() == null ? null : type.getId().intValue();
   }
 }
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
index e25730b..85f9eb2 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
@@ -25,11 +25,11 @@ import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
-import javax.annotation.Nullable;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMapping;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.parquet.ParquetReadOptions;
@@ -49,15 +49,12 @@ class ReadConf<T> {
   private final InputFile file;
   private final ParquetReadOptions options;
   private final MessageType projection;
-  @Nullable
   private final ParquetValueReader<T> model;
-  @Nullable
   private final VectorizedReader<T> vectorizedModel;
   private final List<BlockMetaData> rowGroups;
   private final boolean[] shouldSkip;
   private final long totalValues;
   private final boolean reuseContainers;
-  @Nullable
   private final Integer batchSize;
 
   // List of column chunk metadata for each row group
@@ -66,19 +63,25 @@ class ReadConf<T> {
   @SuppressWarnings("unchecked")
   ReadConf(InputFile file, ParquetReadOptions options, Schema expectedSchema, 
Expression filter,
            Function<MessageType, ParquetValueReader<?>> readerFunc, 
Function<MessageType,
-           VectorizedReader<?>> batchedReaderFunc, boolean reuseContainers,
+           VectorizedReader<?>> batchedReaderFunc, NameMapping nameMapping, 
boolean reuseContainers,
            boolean caseSensitive, Integer bSize) {
     this.file = file;
     this.options = options;
     this.reader = newReader(file, options);
     MessageType fileSchema = reader.getFileMetaData().getSchema();
 
-    boolean hasIds = ParquetSchemaUtil.hasIds(fileSchema);
-    MessageType typeWithIds = hasIds ? fileSchema : 
ParquetSchemaUtil.addFallbackIds(fileSchema);
+    MessageType typeWithIds;
+    if (ParquetSchemaUtil.hasIds(fileSchema)) {
+      typeWithIds = fileSchema;
+      this.projection = ParquetSchemaUtil.pruneColumns(fileSchema, 
expectedSchema);
+    } else if (nameMapping != null) {
+      typeWithIds = ParquetSchemaUtil.applyNameMapping(fileSchema, 
nameMapping);
+      this.projection = ParquetSchemaUtil.pruneColumns(typeWithIds, 
expectedSchema);
+    } else {
+      typeWithIds = ParquetSchemaUtil.addFallbackIds(fileSchema);
+      this.projection = ParquetSchemaUtil.pruneColumnsFallback(fileSchema, 
expectedSchema);
+    }
 
-    this.projection = hasIds ?
-        ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema) :
-        ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema);
     this.rowGroups = reader.getRowGroups();
     this.shouldSkip = new boolean[rowGroups.size()];
 
diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java
index 6cb9da5..481012c 100644
--- 
a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java
+++ 
b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java
@@ -32,6 +32,7 @@ import org.apache.iceberg.io.CloseableGroup;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMapping;
 import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.hadoop.ParquetFileReader;
@@ -48,11 +49,12 @@ public class VectorizedParquetReader<T> extends 
CloseableGroup implements Closea
   private boolean reuseContainers;
   private final boolean caseSensitive;
   private final int batchSize;
+  private final NameMapping nameMapping;
 
   public VectorizedParquetReader(
       InputFile input, Schema expectedSchema, ParquetReadOptions options,
-      Function<MessageType, VectorizedReader<?>> readerFunc,
-      Expression filter, boolean reuseContainers, boolean caseSensitive, int 
maxRecordsPerBatch) {
+      Function<MessageType, VectorizedReader<?>> readerFunc, NameMapping 
nameMapping, Expression filter,
+      boolean reuseContainers, boolean caseSensitive, int maxRecordsPerBatch) {
     this.input = input;
     this.expectedSchema = expectedSchema;
     this.options = options;
@@ -62,6 +64,7 @@ public class VectorizedParquetReader<T> extends 
CloseableGroup implements Closea
     this.reuseContainers = reuseContainers;
     this.caseSensitive = caseSensitive;
     this.batchSize = maxRecordsPerBatch;
+    this.nameMapping = nameMapping;
   }
 
   private ReadConf conf = null;
@@ -69,7 +72,8 @@ public class VectorizedParquetReader<T> extends 
CloseableGroup implements Closea
   private ReadConf init() {
     if (conf == null) {
       ReadConf readConf = new ReadConf(
-          input, options, expectedSchema, filter, null, batchReaderFunc, 
reuseContainers, caseSensitive, batchSize);
+          input, options, expectedSchema, filter, null, batchReaderFunc, 
nameMapping, reuseContainers,
+          caseSensitive, batchSize);
       this.conf = readConf.copy();
       return readConf;
     }
diff --git 
a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetSchemaUtil.java 
b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetSchemaUtil.java
new file mode 100644
index 0000000..f61ca5e
--- /dev/null
+++ 
b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetSchemaUtil.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.mapping.MappingUtil;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.parquet.schema.MessageType;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestParquetSchemaUtil {
+  private static final Types.StructType SUPPORTED_PRIMITIVES = 
Types.StructType.of(
+      required(100, "id", Types.LongType.get()),
+      optional(101, "data", Types.StringType.get()),
+      required(102, "b", Types.BooleanType.get()),
+      optional(103, "i", Types.IntegerType.get()),
+      required(104, "l", Types.LongType.get()),
+      optional(105, "f", Types.FloatType.get()),
+      required(106, "d", Types.DoubleType.get()),
+      optional(107, "date", Types.DateType.get()),
+      required(108, "ts", Types.TimestampType.withZone()),
+      required(110, "s", Types.StringType.get()),
+      required(112, "fixed", Types.FixedType.ofLength(7)),
+      optional(113, "bytes", Types.BinaryType.get()),
+      required(114, "dec_9_0", Types.DecimalType.of(9, 0)),
+      required(115, "dec_11_2", Types.DecimalType.of(11, 2)),
+      required(116, "dec_38_10", Types.DecimalType.of(38, 10)) // spark's 
maximum precision
+  );
+
+  @Test
+  public void testAssignIdsByNameMapping() {
+    Types.StructType structType = Types.StructType.of(
+        required(0, "id", Types.LongType.get()),
+        optional(1, "list_of_maps",
+            Types.ListType.ofOptional(2, Types.MapType.ofOptional(3, 4,
+                Types.StringType.get(),
+                SUPPORTED_PRIMITIVES))),
+        optional(5, "map_of_lists",
+            Types.MapType.ofOptional(6, 7,
+                Types.StringType.get(),
+                Types.ListType.ofOptional(8, SUPPORTED_PRIMITIVES))),
+        required(9, "list_of_lists",
+            Types.ListType.ofOptional(10, Types.ListType.ofOptional(11, 
SUPPORTED_PRIMITIVES))),
+        required(12, "map_of_maps",
+            Types.MapType.ofOptional(13, 14,
+                Types.StringType.get(),
+                Types.MapType.ofOptional(15, 16,
+                    Types.StringType.get(),
+                    SUPPORTED_PRIMITIVES))),
+        required(17, "list_of_struct_of_nested_types", 
Types.ListType.ofOptional(19, Types.StructType.of(
+            Types.NestedField.required(20, "m1", Types.MapType.ofOptional(21, 
22,
+                Types.StringType.get(),
+                SUPPORTED_PRIMITIVES)),
+            Types.NestedField.optional(23, "l1", Types.ListType.ofRequired(24, 
SUPPORTED_PRIMITIVES)),
+            Types.NestedField.required(25, "l2", Types.ListType.ofRequired(26, 
SUPPORTED_PRIMITIVES)),
+            Types.NestedField.optional(27, "m2", Types.MapType.ofOptional(28, 
29,
+                Types.StringType.get(),
+                SUPPORTED_PRIMITIVES))
+        )))
+    );
+
+    Schema schema = new Schema(TypeUtil.assignFreshIds(structType, new 
AtomicInteger(0)::incrementAndGet)
+        .asStructType().fields());
+    NameMapping nameMapping = MappingUtil.create(schema);
+    MessageType messageType = ParquetSchemaUtil.convert(schema, 
"complex_schema");
+    MessageType typeWithIdsFromNameMapping = 
ParquetSchemaUtil.applyNameMapping(messageType, nameMapping);
+    Schema newSchema = ParquetSchemaUtil.convert(typeWithIdsFromNameMapping);
+
+    Assert.assertEquals(schema.asStruct(), newSchema.asStruct());
+  }
+}
diff --git 
a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java 
b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
index 2baf59e..51ddc94 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
@@ -142,9 +142,11 @@ public class SparkParquetReaders {
       for (int i = 0; i < fields.size(); i += 1) {
         Type fieldType = fields.get(i);
         int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
-        int id = fieldType.getId().intValue();
-        readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, 
fieldReaders.get(i)));
-        typesById.put(id, fieldType);
+        if (fieldType.getId() != null) {
+          int id = fieldType.getId().intValue();
+          readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, 
fieldReaders.get(i)));
+          typesById.put(id, fieldType);
+        }
       }
 
       List<Types.NestedField> expectedFields = expected != null ?
diff --git 
a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
 
b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
index 01cbe6f..3eb55eb 100644
--- 
a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
+++ 
b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
@@ -78,6 +78,7 @@ public class VectorizedSparkParquetReaders {
       List<Type> fields = groupType.getFields();
 
       IntStream.range(0, fields.size())
+          .filter(pos -> fields.get(pos).getId() != null)
           .forEach(pos -> readersById.put(fields.get(pos).getId().intValue(), 
fieldReaders.get(pos)));
 
       List<Types.NestedField> icebergFields = expected != null ?
@@ -114,6 +115,9 @@ public class VectorizedSparkParquetReaders {
         PrimitiveType primitive) {
 
       // Create arrow vector for this field
+      if (primitive.getId() == null) {
+        return null;
+      }
       int parquetFieldId = primitive.getId().intValue();
       ColumnDescriptor desc = 
parquetSchema.getColumnDescription(currentPath());
       // Nested types not yet supported for vectorized reads
diff --git 
a/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java 
b/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
index eeb3ad5..f784b63 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMappingParser;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
@@ -36,14 +37,16 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
 
 class BatchDataReader extends BaseDataReader<ColumnarBatch> {
   private final Schema expectedSchema;
+  private final String nameMapping;
   private final boolean caseSensitive;
   private final int batchSize;
 
   BatchDataReader(
-      CombinedScanTask task, Schema expectedSchema, FileIO fileIo,
+      CombinedScanTask task, Schema expectedSchema, String nameMapping, FileIO 
fileIo,
       EncryptionManager encryptionManager, boolean caseSensitive, int size) {
     super(task, fileIo, encryptionManager);
     this.expectedSchema = expectedSchema;
+    this.nameMapping = nameMapping;
     this.caseSensitive = caseSensitive;
     this.batchSize = size;
   }
@@ -54,7 +57,7 @@ class BatchDataReader extends BaseDataReader<ColumnarBatch> {
     InputFile location = getInputFile(task);
     Preconditions.checkNotNull(location, "Could not find InputFile associated 
with FileScanTask");
     if (task.file().format() == FileFormat.PARQUET) {
-      iter = Parquet.read(location)
+      Parquet.ReadBuilder builder = Parquet.read(location)
           .project(expectedSchema)
           .split(task.start(), task.length())
           .createBatchedReaderFunc(fileSchema -> 
VectorizedSparkParquetReaders.buildReader(expectedSchema,
@@ -65,8 +68,13 @@ class BatchDataReader extends BaseDataReader<ColumnarBatch> {
           // Spark eagerly consumes the batches. So the underlying memory 
allocated could be reused
           // without worrying about subsequent reads clobbering over each 
other. This improves
           // read performance as every batch read doesn't have to pay the cost 
of allocating memory.
-          .reuseContainers()
-          .build();
+          .reuseContainers();
+
+      if (nameMapping != null) {
+        builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
+      }
+
+      iter = builder.build();
     } else {
       throw new UnsupportedOperationException(
           "Format: " + task.file().format() + " not supported for batched 
reads");
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java 
b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
index d205c22..a9faf00 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -44,6 +44,7 @@ import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.apache.iceberg.hadoop.Util;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.mapping.NameMapping;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -70,6 +71,8 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+
 class Reader implements DataSourceReader, SupportsScanColumnarBatch, 
SupportsPushDownFilters,
     SupportsPushDownRequiredColumns, SupportsReportStatistics {
   private static final Logger LOG = LoggerFactory.getLogger(Reader.class);
@@ -202,12 +205,13 @@ class Reader implements DataSourceReader, 
SupportsScanColumnarBatch, SupportsPus
     Preconditions.checkState(batchSize > 0, "Invalid batch size");
     String tableSchemaString = SchemaParser.toJson(table.schema());
     String expectedSchemaString = SchemaParser.toJson(lazySchema());
+    String nameMappingString = table.properties().get(DEFAULT_NAME_MAPPING);
 
     List<InputPartition<ColumnarBatch>> readTasks = Lists.newArrayList();
     for (CombinedScanTask task : tasks()) {
       readTasks.add(new ReadTask<>(
-          task, tableSchemaString, expectedSchemaString, io, 
encryptionManager, caseSensitive, localityPreferred,
-          new BatchReaderFactory(batchSize)));
+          task, tableSchemaString, expectedSchemaString, nameMappingString, 
io, encryptionManager, caseSensitive,
+          localityPreferred, new BatchReaderFactory(batchSize)));
     }
     LOG.info("Batching input partitions with {} tasks.", readTasks.size());
 
@@ -221,12 +225,13 @@ class Reader implements DataSourceReader, 
SupportsScanColumnarBatch, SupportsPus
   public List<InputPartition<InternalRow>> planInputPartitions() {
     String tableSchemaString = SchemaParser.toJson(table.schema());
     String expectedSchemaString = SchemaParser.toJson(lazySchema());
+    String nameMappingString = table.properties().get(DEFAULT_NAME_MAPPING);
 
     List<InputPartition<InternalRow>> readTasks = Lists.newArrayList();
     for (CombinedScanTask task : tasks()) {
       readTasks.add(new ReadTask<>(
-          task, tableSchemaString, expectedSchemaString, io, 
encryptionManager, caseSensitive, localityPreferred,
-          InternalRowReaderFactory.INSTANCE));
+          task, tableSchemaString, expectedSchemaString, nameMappingString, 
io, encryptionManager, caseSensitive,
+          localityPreferred, InternalRowReaderFactory.INSTANCE));
     }
 
     return readTasks;
@@ -382,6 +387,7 @@ class Reader implements DataSourceReader, 
SupportsScanColumnarBatch, SupportsPus
     private final CombinedScanTask task;
     private final String tableSchemaString;
     private final String expectedSchemaString;
+    private final String nameMappingString;
     private final Broadcast<FileIO> io;
     private final Broadcast<EncryptionManager> encryptionManager;
     private final boolean caseSensitive;
@@ -390,10 +396,11 @@ class Reader implements DataSourceReader, 
SupportsScanColumnarBatch, SupportsPus
 
     private transient Schema tableSchema = null;
     private transient Schema expectedSchema = null;
+    private transient NameMapping nameMapping = null;
     private transient String[] preferredLocations;
 
     private ReadTask(CombinedScanTask task, String tableSchemaString, String 
expectedSchemaString,
-                     Broadcast<FileIO> io, Broadcast<EncryptionManager> 
encryptionManager,
+                     String nameMappingString, Broadcast<FileIO> io, 
Broadcast<EncryptionManager> encryptionManager,
                      boolean caseSensitive, boolean localityPreferred, 
ReaderFactory<T> readerFactory) {
       this.task = task;
       this.tableSchemaString = tableSchemaString;
@@ -404,11 +411,12 @@ class Reader implements DataSourceReader, 
SupportsScanColumnarBatch, SupportsPus
       this.localityPreferred = localityPreferred;
       this.preferredLocations = getPreferredLocations();
       this.readerFactory = readerFactory;
+      this.nameMappingString = nameMappingString;
     }
 
     @Override
     public InputPartitionReader<T> createPartitionReader() {
-      return readerFactory.create(task, lazyTableSchema(), 
lazyExpectedSchema(), io.value(),
+      return readerFactory.create(task, lazyTableSchema(), 
lazyExpectedSchema(), nameMappingString, io.value(),
           encryptionManager.value(), caseSensitive);
     }
 
@@ -442,7 +450,8 @@ class Reader implements DataSourceReader, 
SupportsScanColumnarBatch, SupportsPus
   }
 
   private interface ReaderFactory<T> extends Serializable {
-    InputPartitionReader<T> create(CombinedScanTask task, Schema tableSchema, 
Schema expectedSchema, FileIO io,
+    InputPartitionReader<T> create(CombinedScanTask task, Schema tableSchema, 
Schema expectedSchema,
+                                   String nameMapping, FileIO io,
                                    EncryptionManager encryptionManager, 
boolean caseSensitive);
   }
 
@@ -454,9 +463,9 @@ class Reader implements DataSourceReader, 
SupportsScanColumnarBatch, SupportsPus
 
     @Override
     public InputPartitionReader<InternalRow> create(CombinedScanTask task, 
Schema tableSchema, Schema expectedSchema,
-                                                    FileIO io, 
EncryptionManager encryptionManager,
-                                                    boolean caseSensitive) {
-      return new RowDataReader(task, tableSchema, expectedSchema, io, 
encryptionManager, caseSensitive);
+                                                    String nameMapping, FileIO 
io,
+                                                    EncryptionManager 
encryptionManager, boolean caseSensitive) {
+      return new RowDataReader(task, tableSchema, expectedSchema, nameMapping, 
io, encryptionManager, caseSensitive);
     }
   }
 
@@ -469,9 +478,9 @@ class Reader implements DataSourceReader, 
SupportsScanColumnarBatch, SupportsPus
 
     @Override
     public InputPartitionReader<ColumnarBatch> create(CombinedScanTask task, 
Schema tableSchema, Schema expectedSchema,
-                                                    FileIO io, 
EncryptionManager encryptionManager,
-                                                    boolean caseSensitive) {
-      return new BatchDataReader(task, expectedSchema, io, encryptionManager, 
caseSensitive, batchSize);
+                                                      String nameMapping, 
FileIO io,
+                                                      EncryptionManager 
encryptionManager, boolean caseSensitive) {
+      return new BatchDataReader(task, expectedSchema, nameMapping, io, 
encryptionManager, caseSensitive, batchSize);
     }
   }
 
diff --git 
a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java 
b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
index c0e46eb..fb0b43d 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
@@ -39,6 +39,7 @@ import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMappingParser;
 import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -71,14 +72,16 @@ class RowDataReader extends BaseDataReader<InternalRow> {
 
   private final Schema tableSchema;
   private final Schema expectedSchema;
+  private final String nameMapping;
   private final boolean caseSensitive;
 
   RowDataReader(
-      CombinedScanTask task, Schema tableSchema, Schema expectedSchema, FileIO 
fileIo,
+      CombinedScanTask task, Schema tableSchema, Schema expectedSchema, String 
nameMapping, FileIO fileIo,
       EncryptionManager encryptionManager, boolean caseSensitive) {
     super(task, fileIo, encryptionManager);
     this.tableSchema = tableSchema;
     this.expectedSchema = expectedSchema;
+    this.nameMapping = nameMapping;
     this.caseSensitive = caseSensitive;
   }
 
@@ -151,13 +154,18 @@ class RowDataReader extends BaseDataReader<InternalRow> {
       FileScanTask task,
       Schema readSchema,
       Map<Integer, ?> idToConstant) {
-    return Parquet.read(location)
-        .project(readSchema)
+    Parquet.ReadBuilder builder = Parquet.read(location)
         .split(task.start(), task.length())
+        .project(readSchema)
         .createReaderFunc(fileSchema -> 
SparkParquetReaders.buildReader(readSchema, fileSchema, idToConstant))
         .filter(task.residual())
-        .caseSensitive(caseSensitive)
-        .build();
+        .caseSensitive(caseSensitive);
+
+    if (nameMapping != null) {
+      builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
+    }
+
+    return builder.build();
   }
 
   private CloseableIterable<InternalRow> newOrcIterable(
diff --git 
a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java 
b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
index 0a56874..384a95b 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
@@ -42,6 +42,8 @@ import org.apache.spark.sql.sources.v2.writer.DataWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+
 public class RowDataRewriter implements Serializable {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(RowDataRewriter.class);
@@ -49,6 +51,7 @@ public class RowDataRewriter implements Serializable {
   private final Broadcast<FileIO> fileIO;
   private final Broadcast<EncryptionManager> encryptionManager;
   private final String tableSchema;
+  private final String nameMapping;
   private final Writer.WriterFactory writerFactory;
   private final boolean caseSensitive;
 
@@ -60,6 +63,7 @@ public class RowDataRewriter implements Serializable {
 
     this.caseSensitive = caseSensitive;
     this.tableSchema = SchemaParser.toJson(table.schema());
+    this.nameMapping = table.properties().get(DEFAULT_NAME_MAPPING);
 
     String formatString = table.properties().getOrDefault(
         TableProperties.DEFAULT_FILE_FORMAT, 
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
@@ -80,7 +84,8 @@ public class RowDataRewriter implements Serializable {
     TaskContext context = TaskContext.get();
 
     RowDataReader dataReader = new RowDataReader(task, 
SchemaParser.fromJson(tableSchema),
-        SchemaParser.fromJson(tableSchema), fileIO.value(), 
encryptionManager.value(), caseSensitive);
+        SchemaParser.fromJson(tableSchema), nameMapping, fileIO.value(),
+        encryptionManager.value(), caseSensitive);
 
     int partitionId = context.partitionId();
     long taskId = context.taskAttemptId();
diff --git 
a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java 
b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
index 4f86fa8..37f57d4 100644
--- 
a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
+++ 
b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
@@ -22,17 +22,23 @@ package org.apache.iceberg.spark.source;
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
+import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.hive.HiveTableBaseTest;
+import org.apache.iceberg.mapping.MappingUtil;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.SparkTableUtil;
 import org.apache.iceberg.spark.SparkTableUtil.SparkPartition;
+import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
@@ -48,6 +54,10 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import scala.collection.Seq;
 
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+import static org.apache.iceberg.TableProperties.PARQUET_VECTORIZATION_ENABLED;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
 public class TestSparkTableUtil extends HiveTableBaseTest {
   private static final Configuration CONF = HiveTableBaseTest.hiveConf;
   private static final String tableName = "hive_table";
@@ -200,4 +210,92 @@ public class TestSparkTableUtil extends HiveTableBaseTest {
     long count2 = spark.read().format("iceberg").load(DB_NAME + 
".test_partitioned_table").count();
     Assert.assertEquals("three values ", 3, count2);
   }
+
+  @Test
+  public void testImportWithNameMapping() throws Exception {
+    spark.table(qualifiedTableName).write().mode("overwrite").format("parquet")
+        .saveAsTable("original_table");
+
+    // The field is different so that it will project with name mapping
+    Schema filteredSchema = new Schema(
+        optional(1, "data", Types.StringType.get())
+    );
+
+    NameMapping nameMapping = MappingUtil.create(filteredSchema);
+
+    TableIdentifier source = new TableIdentifier("original_table");
+    Table table = catalog.createTable(
+        org.apache.iceberg.catalog.TableIdentifier.of(DB_NAME, "target_table"),
+        filteredSchema,
+        SparkSchemaUtil.specForTable(spark, "original_table"));
+
+    table.updateProperties().set(DEFAULT_NAME_MAPPING, 
NameMappingParser.toJson(nameMapping)).commit();
+
+    File stagingDir = temp.newFolder("staging-dir");
+    SparkTableUtil.importSparkTable(spark, source, table, 
stagingDir.toString());
+
+    // The filter invoke the metric/dictionary row group filter in which it 
project schema
+    // with name mapping again to match the metric read from footer.
+    List<String> actual = spark.read().format("iceberg").load(DB_NAME + 
".target_table")
+        .select("data")
+        .sort("data")
+        .filter("data<'c'")
+        .collectAsList()
+        .stream()
+        .map(r -> r.getString(0))
+        .collect(Collectors.toList());
+
+    List<SimpleRecord> expected = Lists.newArrayList(
+        new SimpleRecord(2, "a"),
+        new SimpleRecord(1, "b")
+    );
+
+    
Assert.assertEquals(expected.stream().map(SimpleRecord::getData).collect(Collectors.toList()),
 actual);
+  }
+
+  @Test
+  public void testImportWithNameMappingForVectorizedParquetReader() throws 
Exception {
+    spark.table(qualifiedTableName).write().mode("overwrite").format("parquet")
+        .saveAsTable("original_table");
+
+    // The field is different so that it will project with name mapping
+    Schema filteredSchema = new Schema(
+        optional(1, "data", Types.StringType.get())
+    );
+
+    NameMapping nameMapping = MappingUtil.create(filteredSchema);
+
+    TableIdentifier source = new TableIdentifier("original_table");
+    Table table = catalog.createTable(
+        org.apache.iceberg.catalog.TableIdentifier.of(DB_NAME, 
"target_table_for_vectorization"),
+        filteredSchema,
+        SparkSchemaUtil.specForTable(spark, "original_table"));
+
+    table.updateProperties()
+        .set(DEFAULT_NAME_MAPPING, NameMappingParser.toJson(nameMapping))
+        .set(PARQUET_VECTORIZATION_ENABLED, "true")
+        .commit();
+
+    File stagingDir = temp.newFolder("staging-dir");
+    SparkTableUtil.importSparkTable(spark, source, table, 
stagingDir.toString());
+
+    // The filter invoke the metric/dictionary row group filter in which it 
project schema
+    // with name mapping again to match the metric read from footer.
+    List<String> actual = spark.read().format("iceberg")
+        .load(DB_NAME + ".target_table_for_vectorization")
+        .select("data")
+        .sort("data")
+        .filter("data<'c'")
+        .collectAsList()
+        .stream()
+        .map(r -> r.getString(0))
+        .collect(Collectors.toList());
+
+    List<SimpleRecord> expected = Lists.newArrayList(
+        new SimpleRecord(2, "a"),
+        new SimpleRecord(1, "b")
+    );
+
+    
Assert.assertEquals(expected.stream().map(SimpleRecord::getData).collect(Collectors.toList()),
 actual);
+  }
 }

Reply via email to