This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 5a3cd22 Parquet: Support name mappings to recover field IDs (#830)
5a3cd22 is described below
commit 5a3cd22e775dfa8bf79deab675390aad48ba79a5
Author: Chen, Junjie <[email protected]>
AuthorDate: Thu Jun 18 00:57:08 2020 +0800
Parquet: Support name mappings to recover field IDs (#830)
---
.../main/java/org/apache/iceberg/avro/Avro.java | 2 +-
.../apache/iceberg/avro/TestAvroNameMapping.java | 2 +-
.../apache/iceberg/parquet/ApplyNameMapping.java | 110 +++++++++++++++++++++
.../java/org/apache/iceberg/parquet/Parquet.java | 25 ++++-
.../apache/iceberg/parquet/ParquetReadSupport.java | 17 +++-
.../org/apache/iceberg/parquet/ParquetReader.java | 8 +-
.../apache/iceberg/parquet/ParquetSchemaUtil.java | 58 ++++++++---
.../org/apache/iceberg/parquet/PruneColumns.java | 33 ++++---
.../java/org/apache/iceberg/parquet/ReadConf.java | 23 +++--
.../iceberg/parquet/VectorizedParquetReader.java | 10 +-
.../iceberg/parquet/TestParquetSchemaUtil.java | 95 ++++++++++++++++++
.../iceberg/spark/data/SparkParquetReaders.java | 8 +-
.../vectorized/VectorizedSparkParquetReaders.java | 4 +
.../iceberg/spark/source/BatchDataReader.java | 16 ++-
.../org/apache/iceberg/spark/source/Reader.java | 35 ++++---
.../apache/iceberg/spark/source/RowDataReader.java | 18 +++-
.../iceberg/spark/source/RowDataRewriter.java | 7 +-
.../iceberg/spark/source/TestSparkTableUtil.java | 98 ++++++++++++++++++
18 files changed, 487 insertions(+), 82 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java
b/core/src/main/java/org/apache/iceberg/avro/Avro.java
index 689a1a8..24716e3 100644
--- a/core/src/main/java/org/apache/iceberg/avro/Avro.java
+++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java
@@ -235,7 +235,7 @@ public class Avro {
return this;
}
- public ReadBuilder nameMapping(NameMapping newNameMapping) {
+ public ReadBuilder withNameMapping(NameMapping newNameMapping) {
this.nameMapping = newNameMapping;
return this;
}
diff --git
a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java
b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java
index 02025a7..3a3c28c 100644
--- a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java
+++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java
@@ -315,7 +315,7 @@ public class TestAvroNameMapping extends
TestAvroReadProjection {
Iterable<GenericData.Record> records = Avro.read(Files.localInput(file))
.project(readSchema)
- .nameMapping(nameMapping)
+ .withNameMapping(nameMapping)
.build();
return Iterables.getOnlyElement(records);
diff --git
a/parquet/src/main/java/org/apache/iceberg/parquet/ApplyNameMapping.java
b/parquet/src/main/java/org/apache/iceberg/parquet/ApplyNameMapping.java
new file mode 100644
index 0000000..85d6122
--- /dev/null
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ApplyNameMapping.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.apache.iceberg.mapping.MappedField;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
+
+class ApplyNameMapping extends ParquetTypeVisitor<Type> {
+ private final NameMapping nameMapping;
+
+ ApplyNameMapping(NameMapping nameMapping) {
+ this.nameMapping = nameMapping;
+ }
+
+ @Override
+ public Type message(MessageType message, List<Type> fields) {
+ Types.MessageTypeBuilder builder =
org.apache.parquet.schema.Types.buildMessage();
+ fields.stream().filter(Objects::nonNull).forEach(builder::addField);
+
+ return builder.named(message.getName());
+ }
+
+ @Override
+ public Type struct(GroupType struct, List<Type> types) {
+ MappedField field = nameMapping.find(currentPath());
+ List<Type> actualTypes =
types.stream().filter(Objects::nonNull).collect(Collectors.toList());
+ Type structType = struct.withNewFields(actualTypes);
+
+ return field == null ? structType : structType.withId(field.id());
+ }
+
+ @Override
+ public Type list(GroupType list, Type elementType) {
+ Preconditions.checkArgument(elementType != null,
+ "List type must have element field");
+
+ MappedField field = nameMapping.find(currentPath());
+ Type listType = org.apache.parquet.schema.Types.list(list.getRepetition())
+ .element(elementType)
+ .named(list.getName());
+
+ return field == null ? listType : listType.withId(field.id());
+ }
+
+ @Override
+ public Type map(GroupType map, Type keyType, Type valueType) {
+ Preconditions.checkArgument(keyType != null && valueType != null,
+ "Map type must have both key field and value field");
+
+ MappedField field = nameMapping.find(currentPath());
+ Type mapType = org.apache.parquet.schema.Types.map(map.getRepetition())
+ .key(keyType)
+ .value(valueType)
+ .named(map.getName());
+
+ return field == null ? mapType : mapType.withId(field.id());
+ }
+
+ @Override
+ public Type primitive(PrimitiveType primitive) {
+ MappedField field = nameMapping.find(currentPath());
+ return field == null ? primitive : primitive.withId(field.id());
+ }
+
+ @Override
+ public void beforeRepeatedElement(Type element) {
+ // do not add the repeated element's name
+ }
+
+ @Override
+ public void afterRepeatedElement(Type element) {
+ // do not remove the repeated element's name
+ }
+
+ @Override
+ public void beforeRepeatedKeyValue(Type keyValue) {
+ // do not add the repeated element's name
+ }
+
+ @Override
+ public void afterRepeatedKeyValue(Type keyValue) {
+ // do not remove the repeated element's name
+ }
+}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index df23785..fa6c80d 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -38,6 +38,7 @@ import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -312,6 +313,7 @@ public class Parquet {
private boolean callInit = false;
private boolean reuseContainers = false;
private int maxRecordsPerBatch = 10000;
+ private NameMapping nameMapping = null;
private ReadBuilder(InputFile file) {
this.file = file;
@@ -393,6 +395,11 @@ public class Parquet {
return this;
}
+ public ReadBuilder withNameMapping(NameMapping newNameMapping) {
+ this.nameMapping = newNameMapping;
+ return this;
+ }
+
@SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity"})
public <D> CloseableIterable<D> build() {
if (readerFunc != null || batchedReaderFunc != null) {
@@ -419,11 +426,11 @@ public class Parquet {
ParquetReadOptions options = optionsBuilder.build();
if (batchedReaderFunc != null) {
- return new VectorizedParquetReader(file, schema, options,
batchedReaderFunc, filter, reuseContainers,
- caseSensitive, maxRecordsPerBatch);
+ return new VectorizedParquetReader(file, schema, options,
batchedReaderFunc, nameMapping, filter,
+ reuseContainers, caseSensitive, maxRecordsPerBatch);
} else {
return new org.apache.iceberg.parquet.ParquetReader<>(
- file, schema, options, readerFunc, filter, reuseContainers,
caseSensitive);
+ file, schema, options, readerFunc, nameMapping, filter,
reuseContainers, caseSensitive);
}
}
@@ -475,6 +482,10 @@ public class Parquet {
builder.withFileRange(start, start + length);
}
+ if (nameMapping != null) {
+ builder.withNameMapping(nameMapping);
+ }
+
return new ParquetIterable<>(builder);
}
}
@@ -483,6 +494,7 @@ public class Parquet {
private Schema schema = null;
private ReadSupport<T> readSupport = null;
private boolean callInit = false;
+ private NameMapping nameMapping = null;
private ParquetReadBuilder(org.apache.parquet.io.InputFile file) {
super(file);
@@ -493,6 +505,11 @@ public class Parquet {
return this;
}
+ public ParquetReadBuilder<T> withNameMapping(NameMapping newNameMapping) {
+ this.nameMapping = newNameMapping;
+ return this;
+ }
+
public ParquetReadBuilder<T> readSupport(ReadSupport<T> newReadSupport) {
this.readSupport = newReadSupport;
return this;
@@ -505,7 +522,7 @@ public class Parquet {
@Override
protected ReadSupport<T> getReadSupport() {
- return new ParquetReadSupport<>(schema, readSupport, callInit);
+ return new ParquetReadSupport<>(schema, readSupport, callInit,
nameMapping);
}
}
}
diff --git
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java
index bedc91a..645b3fa 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java
@@ -24,6 +24,7 @@ import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Schema;
import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.parquet.avro.AvroReadSupport;
@@ -41,11 +42,13 @@ class ParquetReadSupport<T> extends ReadSupport<T> {
private final Schema expectedSchema;
private final ReadSupport<T> wrapped;
private final boolean callInit;
+ private final NameMapping nameMapping;
- ParquetReadSupport(Schema expectedSchema, ReadSupport<T> readSupport,
boolean callInit) {
+ ParquetReadSupport(Schema expectedSchema, ReadSupport<T> readSupport,
boolean callInit, NameMapping nameMapping) {
this.expectedSchema = expectedSchema;
this.wrapped = readSupport;
this.callInit = callInit;
+ this.nameMapping = nameMapping;
}
@Override
@@ -55,9 +58,15 @@ class ParquetReadSupport<T> extends ReadSupport<T> {
// matching to the file's columns by full path, so this must select
columns by using the path
// in the file's schema.
- MessageType projection = ParquetSchemaUtil.hasIds(fileSchema) ?
- ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema) :
- ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema);
+ MessageType projection;
+ if (ParquetSchemaUtil.hasIds(fileSchema)) {
+ projection = ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema);
+ } else if (nameMapping != null) {
+ MessageType typeWithIds = ParquetSchemaUtil.applyNameMapping(fileSchema,
nameMapping);
+ projection = ParquetSchemaUtil.pruneColumns(typeWithIds, expectedSchema);
+ } else {
+ projection = ParquetSchemaUtil.pruneColumnsFallback(fileSchema,
expectedSchema);
+ }
// override some known backward-compatibility options
configuration.set("parquet.strict.typing", "false");
diff --git
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
index 17d96c1..d61e4f4 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMapping;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.hadoop.ParquetFileReader;
@@ -42,9 +43,10 @@ public class ParquetReader<T> extends CloseableGroup
implements CloseableIterabl
private final Expression filter;
private final boolean reuseContainers;
private final boolean caseSensitive;
+ private final NameMapping nameMapping;
public ParquetReader(InputFile input, Schema expectedSchema,
ParquetReadOptions options,
- Function<MessageType, ParquetValueReader<?>> readerFunc,
+ Function<MessageType, ParquetValueReader<?>>
readerFunc, NameMapping nameMapping,
Expression filter, boolean reuseContainers, boolean
caseSensitive) {
this.input = input;
this.expectedSchema = expectedSchema;
@@ -54,6 +56,7 @@ public class ParquetReader<T> extends CloseableGroup
implements CloseableIterabl
this.filter = filter == Expressions.alwaysTrue() ? null : filter;
this.reuseContainers = reuseContainers;
this.caseSensitive = caseSensitive;
+ this.nameMapping = nameMapping;
}
private ReadConf<T> conf = null;
@@ -61,7 +64,8 @@ public class ParquetReader<T> extends CloseableGroup
implements CloseableIterabl
private ReadConf<T> init() {
if (conf == null) {
ReadConf<T> readConf = new ReadConf<>(
- input, options, expectedSchema, filter, readerFunc, null,
reuseContainers, caseSensitive, null);
+ input, options, expectedSchema, filter, readerFunc, null,
nameMapping, reuseContainers,
+ caseSensitive, null);
this.conf = readConf.copy();
return readConf;
}
diff --git
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java
index 86dcf6b..2460096 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java
@@ -19,12 +19,16 @@
package org.apache.iceberg.parquet;
+import java.util.List;
import java.util.Set;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
+import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types.MessageTypeBuilder;
@@ -83,22 +87,7 @@ public class ParquetSchemaUtil {
}
public static boolean hasIds(MessageType fileSchema) {
- try {
- // Try to convert the type to Iceberg. If an ID assignment is needed,
return false.
- ParquetTypeVisitor.visit(fileSchema, new MessageTypeToType(fileSchema) {
- @Override
- protected int nextId() {
- throw new IllegalStateException("Needed to assign ID");
- }
- });
-
- // no assignment was needed
- return true;
-
- } catch (IllegalStateException e) {
- // at least one field was missing an id.
- return false;
- }
+ return ParquetTypeVisitor.visit(fileSchema, new HasIds());
}
public static MessageType addFallbackIds(MessageType fileSchema) {
@@ -112,4 +101,41 @@ public class ParquetSchemaUtil {
return builder.named(fileSchema.getName());
}
+
+ public static MessageType applyNameMapping(MessageType fileSchema,
NameMapping nameMapping) {
+ return (MessageType) ParquetTypeVisitor.visit(fileSchema, new
ApplyNameMapping(nameMapping));
+ }
+
+ public static class HasIds extends ParquetTypeVisitor<Boolean> {
+ @Override
+ public Boolean message(MessageType message, List<Boolean> fields) {
+ return struct(message, fields);
+ }
+
+ @Override
+ public Boolean struct(GroupType struct, List<Boolean> hasIds) {
+ for (Boolean hasId : hasIds) {
+ if (hasId) {
+ return true;
+ }
+ }
+ return struct.getId() != null;
+ }
+
+ @Override
+ public Boolean list(GroupType array, Boolean hasId) {
+ return hasId || array.getId() != null;
+ }
+
+ @Override
+ public Boolean map(GroupType map, Boolean keyHasId, Boolean valueHasId) {
+ return keyHasId || valueHasId || map.getId() != null;
+ }
+
+ @Override
+ public Boolean primitive(PrimitiveType primitive) {
+ return primitive.getId() != null;
+ }
+ }
+
}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java
b/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java
index bd5db40..d05db46 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java
@@ -21,7 +21,6 @@ package org.apache.iceberg.parquet;
import java.util.List;
import java.util.Set;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
@@ -45,7 +44,8 @@ class PruneColumns extends ParquetTypeVisitor<Type> {
for (int i = 0; i < fields.size(); i += 1) {
Type originalField = message.getType(i);
Type field = fields.get(i);
- if (selectedIds.contains(getId(originalField))) {
+ Integer fieldId = getId(originalField);
+ if (fieldId != null && selectedIds.contains(fieldId)) {
builder.addField(originalField);
fieldCount += 1;
} else if (field != null) {
@@ -71,7 +71,8 @@ class PruneColumns extends ParquetTypeVisitor<Type> {
for (int i = 0; i < fields.size(); i += 1) {
Type originalField = struct.getType(i);
Type field = fields.get(i);
- if (selectedIds.contains(getId(originalField))) {
+ Integer fieldId = getId(originalField);
+ if (fieldId != null && selectedIds.contains(fieldId)) {
filteredFields.add(originalField);
} else if (field != null) {
filteredFields.add(originalField);
@@ -94,17 +95,18 @@ class PruneColumns extends ParquetTypeVisitor<Type> {
public Type list(GroupType list, Type element) {
GroupType repeated = list.getType(0).asGroupType();
Type originalElement = repeated.getType(0);
- int elementId = getId(originalElement);
+ Integer elementId = getId(originalElement);
- if (selectedIds.contains(elementId)) {
+ if (elementId != null && selectedIds.contains(elementId)) {
return list;
} else if (element != null) {
if (element != originalElement) {
+ Integer listId = getId(list);
// the element type was projected
- return Types.list(list.getRepetition())
+ Type listType = Types.list(list.getRepetition())
.element(element)
- .id(getId(list))
.named(list.getName());
+ return listId == null ? listType : listType.withId(listId);
}
return list;
}
@@ -118,18 +120,20 @@ class PruneColumns extends ParquetTypeVisitor<Type> {
Type originalKey = repeated.getType(0);
Type originalValue = repeated.getType(1);
- int keyId = getId(originalKey);
- int valueId = getId(originalValue);
+ Integer keyId = getId(originalKey);
+ Integer valueId = getId(originalValue);
- if (selectedIds.contains(keyId) || selectedIds.contains(valueId)) {
+ if ((keyId != null && selectedIds.contains(keyId)) || (valueId != null &&
selectedIds.contains(valueId))) {
return map;
} else if (value != null) {
+ Integer mapId = getId(map);
if (value != originalValue) {
- return Types.map(map.getRepetition())
+ Type mapType = Types.map(map.getRepetition())
.key(originalKey)
.value(value)
- .id(getId(map))
.named(map.getName());
+
+ return mapId == null ? mapType : mapType.withId(mapId);
}
return map;
}
@@ -142,8 +146,7 @@ class PruneColumns extends ParquetTypeVisitor<Type> {
return null;
}
- private int getId(Type type) {
- Preconditions.checkNotNull(type.getId(), "Missing id for type: %s", type);
- return type.getId().intValue();
+ private Integer getId(Type type) {
+ return type.getId() == null ? null : type.getId().intValue();
}
}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
index e25730b..85f9eb2 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
@@ -25,11 +25,11 @@ import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
-import javax.annotation.Nullable;
import org.apache.iceberg.Schema;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.parquet.ParquetReadOptions;
@@ -49,15 +49,12 @@ class ReadConf<T> {
private final InputFile file;
private final ParquetReadOptions options;
private final MessageType projection;
- @Nullable
private final ParquetValueReader<T> model;
- @Nullable
private final VectorizedReader<T> vectorizedModel;
private final List<BlockMetaData> rowGroups;
private final boolean[] shouldSkip;
private final long totalValues;
private final boolean reuseContainers;
- @Nullable
private final Integer batchSize;
// List of column chunk metadata for each row group
@@ -66,19 +63,25 @@ class ReadConf<T> {
@SuppressWarnings("unchecked")
ReadConf(InputFile file, ParquetReadOptions options, Schema expectedSchema,
Expression filter,
Function<MessageType, ParquetValueReader<?>> readerFunc,
Function<MessageType,
- VectorizedReader<?>> batchedReaderFunc, boolean reuseContainers,
+ VectorizedReader<?>> batchedReaderFunc, NameMapping nameMapping,
boolean reuseContainers,
boolean caseSensitive, Integer bSize) {
this.file = file;
this.options = options;
this.reader = newReader(file, options);
MessageType fileSchema = reader.getFileMetaData().getSchema();
- boolean hasIds = ParquetSchemaUtil.hasIds(fileSchema);
- MessageType typeWithIds = hasIds ? fileSchema :
ParquetSchemaUtil.addFallbackIds(fileSchema);
+ MessageType typeWithIds;
+ if (ParquetSchemaUtil.hasIds(fileSchema)) {
+ typeWithIds = fileSchema;
+ this.projection = ParquetSchemaUtil.pruneColumns(fileSchema,
expectedSchema);
+ } else if (nameMapping != null) {
+ typeWithIds = ParquetSchemaUtil.applyNameMapping(fileSchema,
nameMapping);
+ this.projection = ParquetSchemaUtil.pruneColumns(typeWithIds,
expectedSchema);
+ } else {
+ typeWithIds = ParquetSchemaUtil.addFallbackIds(fileSchema);
+ this.projection = ParquetSchemaUtil.pruneColumnsFallback(fileSchema,
expectedSchema);
+ }
- this.projection = hasIds ?
- ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema) :
- ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema);
this.rowGroups = reader.getRowGroups();
this.shouldSkip = new boolean[rowGroups.size()];
diff --git
a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java
b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java
index 6cb9da5..481012c 100644
---
a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java
+++
b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java
@@ -32,6 +32,7 @@ import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMapping;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.hadoop.ParquetFileReader;
@@ -48,11 +49,12 @@ public class VectorizedParquetReader<T> extends
CloseableGroup implements Closea
private boolean reuseContainers;
private final boolean caseSensitive;
private final int batchSize;
+ private final NameMapping nameMapping;
public VectorizedParquetReader(
InputFile input, Schema expectedSchema, ParquetReadOptions options,
- Function<MessageType, VectorizedReader<?>> readerFunc,
- Expression filter, boolean reuseContainers, boolean caseSensitive, int
maxRecordsPerBatch) {
+ Function<MessageType, VectorizedReader<?>> readerFunc, NameMapping
nameMapping, Expression filter,
+ boolean reuseContainers, boolean caseSensitive, int maxRecordsPerBatch) {
this.input = input;
this.expectedSchema = expectedSchema;
this.options = options;
@@ -62,6 +64,7 @@ public class VectorizedParquetReader<T> extends
CloseableGroup implements Closea
this.reuseContainers = reuseContainers;
this.caseSensitive = caseSensitive;
this.batchSize = maxRecordsPerBatch;
+ this.nameMapping = nameMapping;
}
private ReadConf conf = null;
@@ -69,7 +72,8 @@ public class VectorizedParquetReader<T> extends
CloseableGroup implements Closea
private ReadConf init() {
if (conf == null) {
ReadConf readConf = new ReadConf(
- input, options, expectedSchema, filter, null, batchReaderFunc,
reuseContainers, caseSensitive, batchSize);
+ input, options, expectedSchema, filter, null, batchReaderFunc,
nameMapping, reuseContainers,
+ caseSensitive, batchSize);
this.conf = readConf.copy();
return readConf;
}
diff --git
a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetSchemaUtil.java
b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetSchemaUtil.java
new file mode 100644
index 0000000..f61ca5e
--- /dev/null
+++
b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetSchemaUtil.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.mapping.MappingUtil;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.parquet.schema.MessageType;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestParquetSchemaUtil {
+ private static final Types.StructType SUPPORTED_PRIMITIVES =
Types.StructType.of(
+ required(100, "id", Types.LongType.get()),
+ optional(101, "data", Types.StringType.get()),
+ required(102, "b", Types.BooleanType.get()),
+ optional(103, "i", Types.IntegerType.get()),
+ required(104, "l", Types.LongType.get()),
+ optional(105, "f", Types.FloatType.get()),
+ required(106, "d", Types.DoubleType.get()),
+ optional(107, "date", Types.DateType.get()),
+ required(108, "ts", Types.TimestampType.withZone()),
+ required(110, "s", Types.StringType.get()),
+ required(112, "fixed", Types.FixedType.ofLength(7)),
+ optional(113, "bytes", Types.BinaryType.get()),
+ required(114, "dec_9_0", Types.DecimalType.of(9, 0)),
+ required(115, "dec_11_2", Types.DecimalType.of(11, 2)),
+ required(116, "dec_38_10", Types.DecimalType.of(38, 10)) // spark's
maximum precision
+ );
+
+ @Test
+ public void testAssignIdsByNameMapping() {
+ Types.StructType structType = Types.StructType.of(
+ required(0, "id", Types.LongType.get()),
+ optional(1, "list_of_maps",
+ Types.ListType.ofOptional(2, Types.MapType.ofOptional(3, 4,
+ Types.StringType.get(),
+ SUPPORTED_PRIMITIVES))),
+ optional(5, "map_of_lists",
+ Types.MapType.ofOptional(6, 7,
+ Types.StringType.get(),
+ Types.ListType.ofOptional(8, SUPPORTED_PRIMITIVES))),
+ required(9, "list_of_lists",
+ Types.ListType.ofOptional(10, Types.ListType.ofOptional(11,
SUPPORTED_PRIMITIVES))),
+ required(12, "map_of_maps",
+ Types.MapType.ofOptional(13, 14,
+ Types.StringType.get(),
+ Types.MapType.ofOptional(15, 16,
+ Types.StringType.get(),
+ SUPPORTED_PRIMITIVES))),
+ required(17, "list_of_struct_of_nested_types",
Types.ListType.ofOptional(19, Types.StructType.of(
+ Types.NestedField.required(20, "m1", Types.MapType.ofOptional(21,
22,
+ Types.StringType.get(),
+ SUPPORTED_PRIMITIVES)),
+ Types.NestedField.optional(23, "l1", Types.ListType.ofRequired(24,
SUPPORTED_PRIMITIVES)),
+ Types.NestedField.required(25, "l2", Types.ListType.ofRequired(26,
SUPPORTED_PRIMITIVES)),
+ Types.NestedField.optional(27, "m2", Types.MapType.ofOptional(28,
29,
+ Types.StringType.get(),
+ SUPPORTED_PRIMITIVES))
+ )))
+ );
+
+ Schema schema = new Schema(TypeUtil.assignFreshIds(structType, new
AtomicInteger(0)::incrementAndGet)
+ .asStructType().fields());
+ NameMapping nameMapping = MappingUtil.create(schema);
+ MessageType messageType = ParquetSchemaUtil.convert(schema,
"complex_schema");
+ MessageType typeWithIdsFromNameMapping =
ParquetSchemaUtil.applyNameMapping(messageType, nameMapping);
+ Schema newSchema = ParquetSchemaUtil.convert(typeWithIdsFromNameMapping);
+
+ Assert.assertEquals(schema.asStruct(), newSchema.asStruct());
+ }
+}
diff --git
a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
index 2baf59e..51ddc94 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
@@ -142,9 +142,11 @@ public class SparkParquetReaders {
for (int i = 0; i < fields.size(); i += 1) {
Type fieldType = fields.get(i);
int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
- int id = fieldType.getId().intValue();
- readersById.put(id, ParquetValueReaders.option(fieldType, fieldD,
fieldReaders.get(i)));
- typesById.put(id, fieldType);
+ if (fieldType.getId() != null) {
+ int id = fieldType.getId().intValue();
+ readersById.put(id, ParquetValueReaders.option(fieldType, fieldD,
fieldReaders.get(i)));
+ typesById.put(id, fieldType);
+ }
}
List<Types.NestedField> expectedFields = expected != null ?
diff --git
a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
index 01cbe6f..3eb55eb 100644
---
a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
+++
b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
@@ -78,6 +78,7 @@ public class VectorizedSparkParquetReaders {
List<Type> fields = groupType.getFields();
IntStream.range(0, fields.size())
+ .filter(pos -> fields.get(pos).getId() != null)
.forEach(pos -> readersById.put(fields.get(pos).getId().intValue(),
fieldReaders.get(pos)));
List<Types.NestedField> icebergFields = expected != null ?
@@ -114,6 +115,9 @@ public class VectorizedSparkParquetReaders {
PrimitiveType primitive) {
// Create arrow vector for this field
+ if (primitive.getId() == null) {
+ return null;
+ }
int parquetFieldId = primitive.getId().intValue();
ColumnDescriptor desc =
parquetSchema.getColumnDescription(currentPath());
// Nested types not yet supported for vectorized reads
diff --git
a/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
b/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
index eeb3ad5..f784b63 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
@@ -36,14 +37,16 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
class BatchDataReader extends BaseDataReader<ColumnarBatch> {
private final Schema expectedSchema;
+ private final String nameMapping;
private final boolean caseSensitive;
private final int batchSize;
BatchDataReader(
- CombinedScanTask task, Schema expectedSchema, FileIO fileIo,
+ CombinedScanTask task, Schema expectedSchema, String nameMapping, FileIO
fileIo,
EncryptionManager encryptionManager, boolean caseSensitive, int size) {
super(task, fileIo, encryptionManager);
this.expectedSchema = expectedSchema;
+ this.nameMapping = nameMapping;
this.caseSensitive = caseSensitive;
this.batchSize = size;
}
@@ -54,7 +57,7 @@ class BatchDataReader extends BaseDataReader<ColumnarBatch> {
InputFile location = getInputFile(task);
Preconditions.checkNotNull(location, "Could not find InputFile associated
with FileScanTask");
if (task.file().format() == FileFormat.PARQUET) {
- iter = Parquet.read(location)
+ Parquet.ReadBuilder builder = Parquet.read(location)
.project(expectedSchema)
.split(task.start(), task.length())
.createBatchedReaderFunc(fileSchema ->
VectorizedSparkParquetReaders.buildReader(expectedSchema,
@@ -65,8 +68,13 @@ class BatchDataReader extends BaseDataReader<ColumnarBatch> {
// Spark eagerly consumes the batches. So the underlying memory
allocated could be reused
// without worrying about subsequent reads clobbering over each
other. This improves
// read performance as every batch read doesn't have to pay the cost
of allocating memory.
- .reuseContainers()
- .build();
+ .reuseContainers();
+
+ if (nameMapping != null) {
+ builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
+ }
+
+ iter = builder.build();
} else {
throw new UnsupportedOperationException(
"Format: " + task.file().format() + " not supported for batched
reads");
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
index d205c22..a9faf00 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -44,6 +44,7 @@ import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -70,6 +71,8 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+
class Reader implements DataSourceReader, SupportsScanColumnarBatch,
SupportsPushDownFilters,
SupportsPushDownRequiredColumns, SupportsReportStatistics {
private static final Logger LOG = LoggerFactory.getLogger(Reader.class);
@@ -202,12 +205,13 @@ class Reader implements DataSourceReader,
SupportsScanColumnarBatch, SupportsPus
Preconditions.checkState(batchSize > 0, "Invalid batch size");
String tableSchemaString = SchemaParser.toJson(table.schema());
String expectedSchemaString = SchemaParser.toJson(lazySchema());
+ String nameMappingString = table.properties().get(DEFAULT_NAME_MAPPING);
List<InputPartition<ColumnarBatch>> readTasks = Lists.newArrayList();
for (CombinedScanTask task : tasks()) {
readTasks.add(new ReadTask<>(
- task, tableSchemaString, expectedSchemaString, io,
encryptionManager, caseSensitive, localityPreferred,
- new BatchReaderFactory(batchSize)));
+ task, tableSchemaString, expectedSchemaString, nameMappingString,
io, encryptionManager, caseSensitive,
+ localityPreferred, new BatchReaderFactory(batchSize)));
}
LOG.info("Batching input partitions with {} tasks.", readTasks.size());
@@ -221,12 +225,13 @@ class Reader implements DataSourceReader,
SupportsScanColumnarBatch, SupportsPus
public List<InputPartition<InternalRow>> planInputPartitions() {
String tableSchemaString = SchemaParser.toJson(table.schema());
String expectedSchemaString = SchemaParser.toJson(lazySchema());
+ String nameMappingString = table.properties().get(DEFAULT_NAME_MAPPING);
List<InputPartition<InternalRow>> readTasks = Lists.newArrayList();
for (CombinedScanTask task : tasks()) {
readTasks.add(new ReadTask<>(
- task, tableSchemaString, expectedSchemaString, io,
encryptionManager, caseSensitive, localityPreferred,
- InternalRowReaderFactory.INSTANCE));
+ task, tableSchemaString, expectedSchemaString, nameMappingString,
io, encryptionManager, caseSensitive,
+ localityPreferred, InternalRowReaderFactory.INSTANCE));
}
return readTasks;
@@ -382,6 +387,7 @@ class Reader implements DataSourceReader,
SupportsScanColumnarBatch, SupportsPus
private final CombinedScanTask task;
private final String tableSchemaString;
private final String expectedSchemaString;
+ private final String nameMappingString;
private final Broadcast<FileIO> io;
private final Broadcast<EncryptionManager> encryptionManager;
private final boolean caseSensitive;
@@ -390,10 +396,11 @@ class Reader implements DataSourceReader,
SupportsScanColumnarBatch, SupportsPus
private transient Schema tableSchema = null;
private transient Schema expectedSchema = null;
+ private transient NameMapping nameMapping = null;
private transient String[] preferredLocations;
private ReadTask(CombinedScanTask task, String tableSchemaString, String
expectedSchemaString,
- Broadcast<FileIO> io, Broadcast<EncryptionManager>
encryptionManager,
+ String nameMappingString, Broadcast<FileIO> io,
Broadcast<EncryptionManager> encryptionManager,
boolean caseSensitive, boolean localityPreferred,
ReaderFactory<T> readerFactory) {
this.task = task;
this.tableSchemaString = tableSchemaString;
@@ -404,11 +411,12 @@ class Reader implements DataSourceReader,
SupportsScanColumnarBatch, SupportsPus
this.localityPreferred = localityPreferred;
this.preferredLocations = getPreferredLocations();
this.readerFactory = readerFactory;
+ this.nameMappingString = nameMappingString;
}
@Override
public InputPartitionReader<T> createPartitionReader() {
- return readerFactory.create(task, lazyTableSchema(),
lazyExpectedSchema(), io.value(),
+ return readerFactory.create(task, lazyTableSchema(),
lazyExpectedSchema(), nameMappingString, io.value(),
encryptionManager.value(), caseSensitive);
}
@@ -442,7 +450,8 @@ class Reader implements DataSourceReader,
SupportsScanColumnarBatch, SupportsPus
}
private interface ReaderFactory<T> extends Serializable {
- InputPartitionReader<T> create(CombinedScanTask task, Schema tableSchema,
Schema expectedSchema, FileIO io,
+ InputPartitionReader<T> create(CombinedScanTask task, Schema tableSchema,
Schema expectedSchema,
+ String nameMapping, FileIO io,
EncryptionManager encryptionManager,
boolean caseSensitive);
}
@@ -454,9 +463,9 @@ class Reader implements DataSourceReader,
SupportsScanColumnarBatch, SupportsPus
@Override
public InputPartitionReader<InternalRow> create(CombinedScanTask task,
Schema tableSchema, Schema expectedSchema,
- FileIO io,
EncryptionManager encryptionManager,
- boolean caseSensitive) {
- return new RowDataReader(task, tableSchema, expectedSchema, io,
encryptionManager, caseSensitive);
+ String nameMapping, FileIO
io,
+ EncryptionManager
encryptionManager, boolean caseSensitive) {
+ return new RowDataReader(task, tableSchema, expectedSchema, nameMapping,
io, encryptionManager, caseSensitive);
}
}
@@ -469,9 +478,9 @@ class Reader implements DataSourceReader,
SupportsScanColumnarBatch, SupportsPus
@Override
public InputPartitionReader<ColumnarBatch> create(CombinedScanTask task,
Schema tableSchema, Schema expectedSchema,
- FileIO io,
EncryptionManager encryptionManager,
- boolean caseSensitive) {
- return new BatchDataReader(task, expectedSchema, io, encryptionManager,
caseSensitive, batchSize);
+ String nameMapping,
FileIO io,
+ EncryptionManager
encryptionManager, boolean caseSensitive) {
+ return new BatchDataReader(task, expectedSchema, nameMapping, io,
encryptionManager, caseSensitive, batchSize);
}
}
diff --git
a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
index c0e46eb..fb0b43d 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
@@ -39,6 +39,7 @@ import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -71,14 +72,16 @@ class RowDataReader extends BaseDataReader<InternalRow> {
private final Schema tableSchema;
private final Schema expectedSchema;
+ private final String nameMapping;
private final boolean caseSensitive;
RowDataReader(
- CombinedScanTask task, Schema tableSchema, Schema expectedSchema, FileIO
fileIo,
+ CombinedScanTask task, Schema tableSchema, Schema expectedSchema, String
nameMapping, FileIO fileIo,
EncryptionManager encryptionManager, boolean caseSensitive) {
super(task, fileIo, encryptionManager);
this.tableSchema = tableSchema;
this.expectedSchema = expectedSchema;
+ this.nameMapping = nameMapping;
this.caseSensitive = caseSensitive;
}
@@ -151,13 +154,18 @@ class RowDataReader extends BaseDataReader<InternalRow> {
FileScanTask task,
Schema readSchema,
Map<Integer, ?> idToConstant) {
- return Parquet.read(location)
- .project(readSchema)
+ Parquet.ReadBuilder builder = Parquet.read(location)
.split(task.start(), task.length())
+ .project(readSchema)
.createReaderFunc(fileSchema ->
SparkParquetReaders.buildReader(readSchema, fileSchema, idToConstant))
.filter(task.residual())
- .caseSensitive(caseSensitive)
- .build();
+ .caseSensitive(caseSensitive);
+
+ if (nameMapping != null) {
+ builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
+ }
+
+ return builder.build();
}
private CloseableIterable<InternalRow> newOrcIterable(
diff --git
a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
index 0a56874..384a95b 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
@@ -42,6 +42,8 @@ import org.apache.spark.sql.sources.v2.writer.DataWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+
public class RowDataRewriter implements Serializable {
private static final Logger LOG =
LoggerFactory.getLogger(RowDataRewriter.class);
@@ -49,6 +51,7 @@ public class RowDataRewriter implements Serializable {
private final Broadcast<FileIO> fileIO;
private final Broadcast<EncryptionManager> encryptionManager;
private final String tableSchema;
+ private final String nameMapping;
private final Writer.WriterFactory writerFactory;
private final boolean caseSensitive;
@@ -60,6 +63,7 @@ public class RowDataRewriter implements Serializable {
this.caseSensitive = caseSensitive;
this.tableSchema = SchemaParser.toJson(table.schema());
+ this.nameMapping = table.properties().get(DEFAULT_NAME_MAPPING);
String formatString = table.properties().getOrDefault(
TableProperties.DEFAULT_FILE_FORMAT,
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
@@ -80,7 +84,8 @@ public class RowDataRewriter implements Serializable {
TaskContext context = TaskContext.get();
RowDataReader dataReader = new RowDataReader(task,
SchemaParser.fromJson(tableSchema),
- SchemaParser.fromJson(tableSchema), fileIO.value(),
encryptionManager.value(), caseSensitive);
+ SchemaParser.fromJson(tableSchema), nameMapping, fileIO.value(),
+ encryptionManager.value(), caseSensitive);
int partitionId = context.partitionId();
long taskId = context.taskAttemptId();
diff --git
a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
index 4f86fa8..37f57d4 100644
---
a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
+++
b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
@@ -22,17 +22,23 @@ package org.apache.iceberg.spark.source;
import java.io.File;
import java.io.IOException;
import java.util.List;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.hive.HiveTableBaseTest;
+import org.apache.iceberg.mapping.MappingUtil;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.spark.SparkTableUtil.SparkPartition;
+import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
@@ -48,6 +54,10 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import scala.collection.Seq;
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+import static org.apache.iceberg.TableProperties.PARQUET_VECTORIZATION_ENABLED;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
public class TestSparkTableUtil extends HiveTableBaseTest {
private static final Configuration CONF = HiveTableBaseTest.hiveConf;
private static final String tableName = "hive_table";
@@ -200,4 +210,92 @@ public class TestSparkTableUtil extends HiveTableBaseTest {
long count2 = spark.read().format("iceberg").load(DB_NAME +
".test_partitioned_table").count();
Assert.assertEquals("three values ", 3, count2);
}
+
+ @Test
+ public void testImportWithNameMapping() throws Exception {
+ spark.table(qualifiedTableName).write().mode("overwrite").format("parquet")
+ .saveAsTable("original_table");
+
+ // The field is different so that it will project with name mapping
+ Schema filteredSchema = new Schema(
+ optional(1, "data", Types.StringType.get())
+ );
+
+ NameMapping nameMapping = MappingUtil.create(filteredSchema);
+
+ TableIdentifier source = new TableIdentifier("original_table");
+ Table table = catalog.createTable(
+ org.apache.iceberg.catalog.TableIdentifier.of(DB_NAME, "target_table"),
+ filteredSchema,
+ SparkSchemaUtil.specForTable(spark, "original_table"));
+
+ table.updateProperties().set(DEFAULT_NAME_MAPPING,
NameMappingParser.toJson(nameMapping)).commit();
+
+ File stagingDir = temp.newFolder("staging-dir");
+ SparkTableUtil.importSparkTable(spark, source, table,
stagingDir.toString());
+
+ // The filter invoke the metric/dictionary row group filter in which it
project schema
+ // with name mapping again to match the metric read from footer.
+ List<String> actual = spark.read().format("iceberg").load(DB_NAME +
".target_table")
+ .select("data")
+ .sort("data")
+ .filter("data<'c'")
+ .collectAsList()
+ .stream()
+ .map(r -> r.getString(0))
+ .collect(Collectors.toList());
+
+ List<SimpleRecord> expected = Lists.newArrayList(
+ new SimpleRecord(2, "a"),
+ new SimpleRecord(1, "b")
+ );
+
+
Assert.assertEquals(expected.stream().map(SimpleRecord::getData).collect(Collectors.toList()),
actual);
+ }
+
+ @Test
+ public void testImportWithNameMappingForVectorizedParquetReader() throws
Exception {
+ spark.table(qualifiedTableName).write().mode("overwrite").format("parquet")
+ .saveAsTable("original_table");
+
+ // The field is different so that it will project with name mapping
+ Schema filteredSchema = new Schema(
+ optional(1, "data", Types.StringType.get())
+ );
+
+ NameMapping nameMapping = MappingUtil.create(filteredSchema);
+
+ TableIdentifier source = new TableIdentifier("original_table");
+ Table table = catalog.createTable(
+ org.apache.iceberg.catalog.TableIdentifier.of(DB_NAME,
"target_table_for_vectorization"),
+ filteredSchema,
+ SparkSchemaUtil.specForTable(spark, "original_table"));
+
+ table.updateProperties()
+ .set(DEFAULT_NAME_MAPPING, NameMappingParser.toJson(nameMapping))
+ .set(PARQUET_VECTORIZATION_ENABLED, "true")
+ .commit();
+
+ File stagingDir = temp.newFolder("staging-dir");
+ SparkTableUtil.importSparkTable(spark, source, table,
stagingDir.toString());
+
+ // The filter invoke the metric/dictionary row group filter in which it
project schema
+ // with name mapping again to match the metric read from footer.
+ List<String> actual = spark.read().format("iceberg")
+ .load(DB_NAME + ".target_table_for_vectorization")
+ .select("data")
+ .sort("data")
+ .filter("data<'c'")
+ .collectAsList()
+ .stream()
+ .map(r -> r.getString(0))
+ .collect(Collectors.toList());
+
+ List<SimpleRecord> expected = Lists.newArrayList(
+ new SimpleRecord(2, "a"),
+ new SimpleRecord(1, "b")
+ );
+
+
Assert.assertEquals(expected.stream().map(SimpleRecord::getData).collect(Collectors.toList()),
actual);
+ }
}