This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 604422b056 Core: Refactor internal Avro reader to resolve schemas
directly (#9366)
604422b056 is described below
commit 604422b056efee145f69944c39b4d99787dfd9f1
Author: Ryan Blue <[email protected]>
AuthorDate: Mon Jan 1 11:03:41 2024 -0800
Core: Refactor internal Avro reader to resolve schemas directly (#9366)
---
.../main/java/org/apache/iceberg/avro/Avro.java | 52 ++-
.../java/org/apache/iceberg/avro/AvroIterable.java | 4 +-
.../iceberg/avro/AvroWithPartnerVisitor.java | 211 ++++++++++++
.../org/apache/iceberg/avro/GenericAvroReader.java | 166 +++++++--
.../iceberg/avro/NameMappingDatumReader.java | 66 ++++
...ValueReader.java => SupportsCustomRecords.java} | 10 +-
.../java/org/apache/iceberg/avro/ValueReader.java | 4 +
.../java/org/apache/iceberg/avro/ValueReaders.java | 369 +++++++++++++++++++--
.../apache/iceberg/avro/TestAvroNameMapping.java | 23 +-
9 files changed, 813 insertions(+), 92 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java
b/core/src/main/java/org/apache/iceberg/avro/Avro.java
index 85cc8d9020..afc502e20d 100644
--- a/core/src/main/java/org/apache/iceberg/avro/Avro.java
+++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java
@@ -61,6 +61,7 @@ import org.apache.iceberg.io.DeleteSchemaUtil;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.mapping.MappingUtil;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -610,11 +611,12 @@ public class Avro {
private org.apache.iceberg.Schema schema = null;
private Function<Schema, DatumReader<?>> createReaderFunc = null;
private BiFunction<org.apache.iceberg.Schema, Schema, DatumReader<?>>
createReaderBiFunc = null;
+ private Function<org.apache.iceberg.Schema, DatumReader<?>>
createResolvingReaderFunc = null;
@SuppressWarnings("UnnecessaryLambda")
- private final Function<Schema, DatumReader<?>> defaultCreateReaderFunc =
+ private final Function<org.apache.iceberg.Schema, DatumReader<?>>
defaultCreateReaderFunc =
readSchema -> {
- GenericAvroReader<?> reader = new GenericAvroReader<>(readSchema);
+ GenericAvroReader<?> reader = GenericAvroReader.create(readSchema);
reader.setClassLoader(loader);
return reader;
};
@@ -627,15 +629,28 @@ public class Avro {
this.file = file;
}
+ public ReadBuilder createResolvingReader(
+ Function<org.apache.iceberg.Schema, DatumReader<?>> readerFunction) {
+ Preconditions.checkState(
+ createReaderBiFunc == null && createReaderFunc == null,
+ "Cannot set multiple read builder functions");
+ this.createResolvingReaderFunc = readerFunction;
+ return this;
+ }
+
public ReadBuilder createReaderFunc(Function<Schema, DatumReader<?>>
readerFunction) {
- Preconditions.checkState(createReaderBiFunc == null, "Cannot set
multiple createReaderFunc");
+ Preconditions.checkState(
+ createReaderBiFunc == null && createResolvingReaderFunc == null,
+ "Cannot set multiple read builder functions");
this.createReaderFunc = readerFunction;
return this;
}
public ReadBuilder createReaderFunc(
BiFunction<org.apache.iceberg.Schema, Schema, DatumReader<?>>
readerFunction) {
- Preconditions.checkState(createReaderFunc == null, "Cannot set multiple
createReaderFunc");
+ Preconditions.checkState(
+ createReaderFunc == null && createResolvingReaderFunc == null,
+ "Cannot set multiple read builder functions");
this.createReaderBiFunc = readerFunction;
return this;
}
@@ -683,23 +698,34 @@ public class Avro {
return this;
}
+ @SuppressWarnings("unchecked")
public <D> AvroIterable<D> build() {
Preconditions.checkNotNull(schema, "Schema is required");
- Function<Schema, DatumReader<?>> readerFunc;
+
+ if (null == nameMapping) {
+ this.nameMapping = MappingUtil.create(schema);
+ }
+
+ DatumReader<D> reader;
if (createReaderBiFunc != null) {
- readerFunc = avroSchema -> createReaderBiFunc.apply(schema,
avroSchema);
+ reader =
+ new ProjectionDatumReader<>(
+ avroSchema -> createReaderBiFunc.apply(schema, avroSchema),
schema, renames, null);
} else if (createReaderFunc != null) {
- readerFunc = createReaderFunc;
+ reader = new ProjectionDatumReader<>(createReaderFunc, schema,
renames, null);
+ } else if (createResolvingReaderFunc != null) {
+ reader = (DatumReader<D>) createResolvingReaderFunc.apply(schema);
} else {
- readerFunc = defaultCreateReaderFunc;
+ reader = (DatumReader<D>) defaultCreateReaderFunc.apply(schema);
+ }
+
+ if (reader instanceof SupportsCustomRecords) {
+ ((SupportsCustomRecords) reader).setClassLoader(loader);
+ ((SupportsCustomRecords) reader).setRenames(renames);
}
return new AvroIterable<>(
- file,
- new ProjectionDatumReader<>(readerFunc, schema, renames,
nameMapping),
- start,
- length,
- reuseContainers);
+ file, new NameMappingDatumReader<>(nameMapping, reader), start,
length, reuseContainers);
}
}
diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroIterable.java
b/core/src/main/java/org/apache/iceberg/avro/AvroIterable.java
index 49acb8010b..3bcc6a4799 100644
--- a/core/src/main/java/org/apache/iceberg/avro/AvroIterable.java
+++ b/core/src/main/java/org/apache/iceberg/avro/AvroIterable.java
@@ -31,6 +31,7 @@ import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Suppliers;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
public class AvroIterable<D> extends CloseableGroup implements
CloseableIterable<D> {
@@ -78,7 +79,8 @@ public class AvroIterable<D> extends CloseableGroup
implements CloseableIterable
if (start != null) {
if (reader instanceof SupportsRowPosition) {
((SupportsRowPosition) reader)
- .setRowPositionSupplier(() ->
AvroIO.findStartingRowPos(file::newStream, start));
+ .setRowPositionSupplier(
+ Suppliers.memoize(() ->
AvroIO.findStartingRowPos(file::newStream, start)));
}
fileReader = new AvroRangeIterator<>(fileReader, start, end);
} else if (reader instanceof SupportsRowPosition) {
diff --git
a/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerVisitor.java
b/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerVisitor.java
new file mode 100644
index 0000000000..0147dbf37d
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerVisitor.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.avro;
+
+import java.util.Deque;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+public class AvroWithPartnerVisitor<P, R> {
+ public interface PartnerAccessors<P> {
+ P fieldPartner(P partnerStruct, Integer fieldId, String name);
+
+ P mapKeyPartner(P partnerMap);
+
+ P mapValuePartner(P partnerMap);
+
+ P listElementPartner(P partnerList);
+ }
+
+ static class FieldIDAccessors implements
AvroWithPartnerVisitor.PartnerAccessors<Type> {
+ private static final FieldIDAccessors INSTANCE = new FieldIDAccessors();
+
+ public static FieldIDAccessors get() {
+ return INSTANCE;
+ }
+
+ @Override
+ public Type fieldPartner(Type partner, Integer fieldId, String name) {
+ Types.NestedField field = partner.asStructType().field(fieldId);
+ return field != null ? field.type() : null;
+ }
+
+ @Override
+ public Type mapKeyPartner(Type partner) {
+ return partner.asMapType().keyType();
+ }
+
+ @Override
+ public Type mapValuePartner(Type partner) {
+ return partner.asMapType().valueType();
+ }
+
+ @Override
+ public Type listElementPartner(Type partner) {
+ return partner.asListType().elementType();
+ }
+ }
+
+ /** Used to fail on recursive types. */
+ private Deque<String> recordLevels = Lists.newLinkedList();
+
+ public R record(P partner, Schema record, List<R> fieldResults) {
+ return null;
+ }
+
+ public R union(P partner, Schema union, List<R> optionResults) {
+ return null;
+ }
+
+ public R array(P partner, Schema array, R elementResult) {
+ return null;
+ }
+
+ public R arrayMap(P partner, Schema map, R keyResult, R valueResult) {
+ return null;
+ }
+
+ public R map(P partner, Schema map, R valueResult) {
+ return null;
+ }
+
+ public R primitive(P partner, Schema primitive) {
+ return null;
+ }
+
+ public static <P, R> R visit(
+ P partner,
+ Schema schema,
+ AvroWithPartnerVisitor<P, R> visitor,
+ PartnerAccessors<P> accessors) {
+ switch (schema.getType()) {
+ case RECORD:
+ return visitRecord(partner, schema, visitor, accessors);
+
+ case UNION:
+ return visitUnion(partner, schema, visitor, accessors);
+
+ case ARRAY:
+ return visitArray(partner, schema, visitor, accessors);
+
+ case MAP:
+ return visitor.map(
+ partner,
+ schema,
+ visit(
+ partner != null ? accessors.mapValuePartner(partner) : null,
+ schema.getValueType(),
+ visitor,
+ accessors));
+
+ default:
+ return visitor.primitive(partner, schema);
+ }
+ }
+
+ private static <P, R> R visitRecord(
+ P partnerStruct,
+ Schema record,
+ AvroWithPartnerVisitor<P, R> visitor,
+ PartnerAccessors<P> accessors) {
+ // check to make sure this hasn't been visited before
+ String recordName = record.getFullName();
+ Preconditions.checkState(
+ !visitor.recordLevels.contains(recordName),
+ "Cannot process recursive Avro record %s",
+ recordName);
+ visitor.recordLevels.push(recordName);
+
+ List<Schema.Field> fields = record.getFields();
+ List<R> results = Lists.newArrayListWithExpectedSize(fields.size());
+ for (int pos = 0; pos < fields.size(); pos += 1) {
+ Schema.Field field = fields.get(pos);
+ Integer fieldId = AvroSchemaUtil.fieldId(field);
+
+ P fieldPartner =
+ partnerStruct != null && fieldId != null
+ ? accessors.fieldPartner(partnerStruct, fieldId, field.name())
+ : null;
+ results.add(visit(fieldPartner, field.schema(), visitor, accessors));
+ }
+
+ visitor.recordLevels.pop();
+
+ return visitor.record(partnerStruct, record, results);
+ }
+
+ private static <P, R> R visitUnion(
+ P partner,
+ Schema union,
+ AvroWithPartnerVisitor<P, R> visitor,
+ PartnerAccessors<P> accessors) {
+ Preconditions.checkArgument(
+ AvroSchemaUtil.isOptionSchema(union), "Cannot visit non-option union:
%s", union);
+
+ List<Schema> types = union.getTypes();
+ List<R> options = Lists.newArrayListWithExpectedSize(types.size());
+ for (Schema branch : types) {
+ options.add(visit(partner, branch, visitor, accessors));
+ }
+
+ return visitor.union(partner, union, options);
+ }
+
+ private static <P, R> R visitArray(
+ P partnerArray,
+ Schema array,
+ AvroWithPartnerVisitor<P, R> visitor,
+ PartnerAccessors<P> accessors) {
+ if (array.getLogicalType() instanceof LogicalMap) {
+ Preconditions.checkState(
+ AvroSchemaUtil.isKeyValueSchema(array.getElementType()),
+ "Cannot visit invalid logical map type: %s",
+ array);
+
+ List<Schema.Field> keyValueFields = array.getElementType().getFields();
+ return visitor.arrayMap(
+ partnerArray,
+ array,
+ visit(
+ partnerArray != null ? accessors.mapKeyPartner(partnerArray) :
null,
+ keyValueFields.get(0).schema(),
+ visitor,
+ accessors),
+ visit(
+ partnerArray != null ? accessors.mapValuePartner(partnerArray) :
null,
+ keyValueFields.get(1).schema(),
+ visitor,
+ accessors));
+
+ } else {
+ return visitor.array(
+ partnerArray,
+ array,
+ visit(
+ partnerArray != null ?
accessors.listElementPartner(partnerArray) : null,
+ array.getElementType(),
+ visitor,
+ accessors));
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java
b/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java
index 0fa2e79581..d89489d92a 100644
--- a/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java
+++ b/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.avro;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import java.util.function.Supplier;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
@@ -27,39 +28,68 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
+import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.common.DynClasses;
-import org.apache.iceberg.data.avro.DecoderResolver;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
-public class GenericAvroReader<T> implements DatumReader<T>,
SupportsRowPosition {
+public class GenericAvroReader<T>
+ implements DatumReader<T>, SupportsRowPosition, SupportsCustomRecords {
- private final Schema readSchema;
+ private final Types.StructType expectedType;
private ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ private Map<String, String> renames = ImmutableMap.of();
+ private Map<Integer, ?> idToConstant = ImmutableMap.of();
private Schema fileSchema = null;
private ValueReader<T> reader = null;
+ public static <D> GenericAvroReader<D> create(org.apache.iceberg.Schema
schema) {
+ return new GenericAvroReader<>(schema);
+ }
+
public static <D> GenericAvroReader<D> create(Schema schema) {
return new GenericAvroReader<>(schema);
}
+ GenericAvroReader(org.apache.iceberg.Schema readSchema) {
+ this.expectedType = readSchema.asStruct();
+ }
+
GenericAvroReader(Schema readSchema) {
- this.readSchema = readSchema;
+ this.expectedType = AvroSchemaUtil.convert(readSchema).asStructType();
}
@SuppressWarnings("unchecked")
private void initReader() {
- this.reader = (ValueReader<T>) AvroSchemaVisitor.visit(readSchema, new
ReadBuilder(loader));
+ this.reader =
+ (ValueReader<T>)
+ AvroWithPartnerVisitor.visit(
+ expectedType,
+ fileSchema,
+ new ResolvingReadBuilder(expectedType,
fileSchema.getFullName()),
+ AvroWithPartnerVisitor.FieldIDAccessors.get());
}
@Override
public void setSchema(Schema schema) {
- this.fileSchema = Schema.applyAliases(schema, readSchema);
+ this.fileSchema = schema;
initReader();
}
+ @Override
public void setClassLoader(ClassLoader newClassLoader) {
this.loader = newClassLoader;
}
+ @Override
+ public void setRenames(Map<String, String> renames) {
+ this.renames = renames;
+ }
+
@Override
public void setRowPositionSupplier(Supplier<Long> posSupplier) {
if (reader instanceof SupportsRowPosition) {
@@ -69,62 +99,108 @@ public class GenericAvroReader<T> implements
DatumReader<T>, SupportsRowPosition
@Override
public T read(T reuse, Decoder decoder) throws IOException {
- return DecoderResolver.resolveAndRead(decoder, readSchema, fileSchema,
reader, reuse);
+ return reader.read(decoder, reuse);
}
- private static class ReadBuilder extends AvroSchemaVisitor<ValueReader<?>> {
- private final ClassLoader loader;
+ private class ResolvingReadBuilder extends AvroWithPartnerVisitor<Type,
ValueReader<?>> {
+ private final Map<Type, Schema> avroSchemas;
- private ReadBuilder(ClassLoader loader) {
- this.loader = loader;
+ private ResolvingReadBuilder(Types.StructType expectedType, String
rootName) {
+ this.avroSchemas = AvroSchemaUtil.convertTypes(expectedType, rootName);
}
@Override
- @SuppressWarnings("unchecked")
- public ValueReader<?> record(Schema record, List<String> names,
List<ValueReader<?>> fields) {
- try {
- Class<?> recordClass =
-
DynClasses.builder().loader(loader).impl(record.getFullName()).buildChecked();
- if (IndexedRecord.class.isAssignableFrom(recordClass)) {
- return ValueReaders.record(fields, (Class<? extends IndexedRecord>)
recordClass, record);
+ public ValueReader<?> record(Type partner, Schema record,
List<ValueReader<?>> fieldResults) {
+ Types.StructType expected = partner != null ? partner.asStructType() :
null;
+ Map<Integer, Integer> idToPos = idToPos(expected);
+
+ List<Pair<Integer, ValueReader<?>>> readPlan = Lists.newArrayList();
+ List<Schema.Field> fileFields = record.getFields();
+ for (int pos = 0; pos < fileFields.size(); pos += 1) {
+ Schema.Field field = fileFields.get(pos);
+ ValueReader<?> fieldReader = fieldResults.get(pos);
+ Integer fieldId = AvroSchemaUtil.fieldId(field);
+ Integer projectionPos = idToPos.remove(fieldId);
+
+ Object constant = idToConstant.get(fieldId);
+ if (projectionPos != null && constant != null) {
+ readPlan.add(
+ Pair.of(projectionPos,
ValueReaders.replaceWithConstant(fieldReader, constant)));
+ } else {
+ readPlan.add(Pair.of(projectionPos, fieldReader));
+ }
+ }
+
+ // handle any expected columns that are not in the data file
+ for (Map.Entry<Integer, Integer> idAndPos : idToPos.entrySet()) {
+ int fieldId = idAndPos.getKey();
+ int pos = idAndPos.getValue();
+
+ Object constant = idToConstant.get(fieldId);
+ Types.NestedField field = expected.field(fieldId);
+ if (constant != null) {
+ readPlan.add(Pair.of(pos, ValueReaders.constant(constant)));
+ } else if (fieldId == MetadataColumns.IS_DELETED.fieldId()) {
+ readPlan.add(Pair.of(pos, ValueReaders.constant(false)));
+ } else if (fieldId == MetadataColumns.ROW_POSITION.fieldId()) {
+ readPlan.add(Pair.of(pos, ValueReaders.positions()));
+ } else if (field.isOptional()) {
+ readPlan.add(Pair.of(pos, ValueReaders.constant(null)));
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Missing required field: %s", field.name()));
}
+ }
- return ValueReaders.record(fields, record);
+ return recordReader(readPlan, avroSchemas.get(partner),
record.getFullName());
+ }
- } catch (ClassNotFoundException e) {
- return ValueReaders.record(fields, record);
+ @SuppressWarnings("unchecked")
+ private ValueReader<?> recordReader(
+ List<Pair<Integer, ValueReader<?>>> readPlan, Schema avroSchema,
String recordName) {
+ String className = renames.getOrDefault(recordName, recordName);
+ if (className != null) {
+ try {
+ Class<?> recordClass =
DynClasses.builder().loader(loader).impl(className).buildChecked();
+ if (IndexedRecord.class.isAssignableFrom(recordClass)) {
+ return ValueReaders.record(
+ avroSchema, (Class<? extends IndexedRecord>) recordClass,
readPlan);
+ }
+ } catch (ClassNotFoundException e) {
+ // use a generic record reader below
+ }
}
+
+ return ValueReaders.record(avroSchema, readPlan);
}
@Override
- public ValueReader<?> union(Schema union, List<ValueReader<?>> options) {
+ public ValueReader<?> union(Type partner, Schema union,
List<ValueReader<?>> options) {
return ValueReaders.union(options);
}
@Override
- public ValueReader<?> array(Schema array, ValueReader<?> elementReader) {
- if (array.getLogicalType() instanceof LogicalMap) {
- ValueReaders.StructReader<?> keyValueReader =
(ValueReaders.StructReader) elementReader;
- ValueReader<?> keyReader = keyValueReader.reader(0);
- ValueReader<?> valueReader = keyValueReader.reader(1);
-
- if (keyReader == ValueReaders.utf8s()) {
- return ValueReaders.arrayMap(ValueReaders.strings(), valueReader);
- }
-
- return ValueReaders.arrayMap(keyReader, valueReader);
+ public ValueReader<?> arrayMap(
+ Type partner, Schema map, ValueReader<?> keyReader, ValueReader<?>
valueReader) {
+ if (keyReader == ValueReaders.utf8s()) {
+ return ValueReaders.arrayMap(ValueReaders.strings(), valueReader);
}
+ return ValueReaders.arrayMap(keyReader, valueReader);
+ }
+
+ @Override
+ public ValueReader<?> array(Type partner, Schema array, ValueReader<?>
elementReader) {
return ValueReaders.array(elementReader);
}
@Override
- public ValueReader<?> map(Schema map, ValueReader<?> valueReader) {
+ public ValueReader<?> map(Type partner, Schema map, ValueReader<?>
valueReader) {
return ValueReaders.map(ValueReaders.strings(), valueReader);
}
@Override
- public ValueReader<?> primitive(Schema primitive) {
+ public ValueReader<?> primitive(Type partner, Schema primitive) {
LogicalType logicalType = primitive.getLogicalType();
if (logicalType != null) {
switch (logicalType.getName()) {
@@ -163,10 +239,16 @@ public class GenericAvroReader<T> implements
DatumReader<T>, SupportsRowPosition
case BOOLEAN:
return ValueReaders.booleans();
case INT:
+ if (partner != null && partner.typeId() == Type.TypeID.LONG) {
+ return ValueReaders.intsAsLongs();
+ }
return ValueReaders.ints();
case LONG:
return ValueReaders.longs();
case FLOAT:
+ if (partner != null && partner.typeId() == Type.TypeID.DOUBLE) {
+ return ValueReaders.floatsAsDoubles();
+ }
return ValueReaders.floats();
case DOUBLE:
return ValueReaders.doubles();
@@ -182,5 +264,19 @@ public class GenericAvroReader<T> implements
DatumReader<T>, SupportsRowPosition
throw new IllegalArgumentException("Unsupported type: " + primitive);
}
}
+
+ private Map<Integer, Integer> idToPos(Types.StructType struct) {
+ Map<Integer, Integer> idToPos = Maps.newHashMap();
+
+ if (struct != null) {
+ List<Types.NestedField> fields = struct.fields();
+ for (int pos = 0; pos < fields.size(); pos += 1) {
+ Types.NestedField field = fields.get(pos);
+ idToPos.put(field.fieldId(), pos);
+ }
+ }
+
+ return idToPos;
+ }
}
}
diff --git
a/core/src/main/java/org/apache/iceberg/avro/NameMappingDatumReader.java
b/core/src/main/java/org/apache/iceberg/avro/NameMappingDatumReader.java
new file mode 100644
index 0000000000..83f2beabee
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/avro/NameMappingDatumReader.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.avro;
+
+import java.io.IOException;
+import java.util.function.Supplier;
+import org.apache.avro.Schema;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.iceberg.mapping.NameMapping;
+
+/**
+ * A delegating DatumReader that applies a name mapping to a file schema to
enable reading Avro
+ * files that were written without field IDs.
+ *
+ * @param <D> Java class of datums produced by this reader
+ */
+public class NameMappingDatumReader<D> implements DatumReader<D>,
SupportsRowPosition {
+ private final NameMapping nameMapping;
+ private final DatumReader<D> wrapped;
+
+ public NameMappingDatumReader(NameMapping nameMapping, DatumReader<D>
wrapped) {
+ this.nameMapping = nameMapping;
+ this.wrapped = wrapped;
+ }
+
+ @Override
+ public void setSchema(Schema newFileSchema) {
+ Schema fileSchema;
+ if (AvroSchemaUtil.hasIds(newFileSchema)) {
+ fileSchema = newFileSchema;
+ } else {
+ fileSchema = AvroSchemaUtil.applyNameMapping(newFileSchema, nameMapping);
+ }
+
+ wrapped.setSchema(fileSchema);
+ }
+
+ @Override
+ public D read(D reuse, Decoder in) throws IOException {
+ return wrapped.read(reuse, in);
+ }
+
+ @Override
+ public void setRowPositionSupplier(Supplier<Long> posSupplier) {
+ if (wrapped instanceof SupportsRowPosition) {
+ ((SupportsRowPosition) wrapped).setRowPositionSupplier(posSupplier);
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReader.java
b/core/src/main/java/org/apache/iceberg/avro/SupportsCustomRecords.java
similarity index 79%
copy from core/src/main/java/org/apache/iceberg/avro/ValueReader.java
copy to core/src/main/java/org/apache/iceberg/avro/SupportsCustomRecords.java
index 5470b8168f..c05e206e13 100644
--- a/core/src/main/java/org/apache/iceberg/avro/ValueReader.java
+++ b/core/src/main/java/org/apache/iceberg/avro/SupportsCustomRecords.java
@@ -18,9 +18,11 @@
*/
package org.apache.iceberg.avro;
-import java.io.IOException;
-import org.apache.avro.io.Decoder;
+import java.util.Map;
-public interface ValueReader<T> {
- T read(Decoder decoder, Object reuse) throws IOException;
+/** An interface for Avro DatumReaders to support custom record classes. */
+interface SupportsCustomRecords {
+ void setClassLoader(ClassLoader loader);
+
+ void setRenames(Map<String, String> renames);
}
diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReader.java
b/core/src/main/java/org/apache/iceberg/avro/ValueReader.java
index 5470b8168f..d01bd74f53 100644
--- a/core/src/main/java/org/apache/iceberg/avro/ValueReader.java
+++ b/core/src/main/java/org/apache/iceberg/avro/ValueReader.java
@@ -23,4 +23,8 @@ import org.apache.avro.io.Decoder;
public interface ValueReader<T> {
T read(Decoder decoder, Object reuse) throws IOException;
+
+ default void skip(Decoder decoder) throws IOException {
+ read(decoder, null);
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
index 19789cce82..d530bc1854 100644
--- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
+++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
@@ -31,6 +31,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.UUID;
import java.util.function.Supplier;
import org.apache.avro.Schema;
@@ -44,6 +45,7 @@ import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.UUIDUtil;
public class ValueReaders {
@@ -53,6 +55,14 @@ public class ValueReaders {
return NullReader.INSTANCE;
}
+ public static <T> ValueReader<T> constant(T value) {
+ return new ConstantReader<>(value);
+ }
+
+ public static <T> ValueReader<T> replaceWithConstant(ValueReader<?> reader,
T value) {
+ return new ReplaceWithConstantReader<>(reader, value);
+ }
+
public static ValueReader<Boolean> booleans() {
return BooleanReader.INSTANCE;
}
@@ -61,6 +71,10 @@ public class ValueReaders {
return IntegerReader.INSTANCE;
}
+ public static ValueReader<Long> intsAsLongs() {
+ return IntegerAsLongReader.INSTANCE;
+ }
+
public static ValueReader<Long> longs() {
return LongReader.INSTANCE;
}
@@ -69,6 +83,10 @@ public class ValueReaders {
return FloatReader.INSTANCE;
}
+ public static ValueReader<Double> floatsAsDoubles() {
+ return FloatAsDoubleReader.INSTANCE;
+ }
+
public static ValueReader<Double> doubles() {
return DoubleReader.INSTANCE;
}
@@ -125,6 +143,10 @@ public class ValueReaders {
return new UnionReader(readers);
}
+ public static ValueReader<Long> positions() {
+ return new PositionReader();
+ }
+
public static <T> ValueReader<Collection<T>> array(ValueReader<T>
elementReader) {
return new ArrayReader<>(elementReader);
}
@@ -149,6 +171,16 @@ public class ValueReaders {
return new IndexedRecordReader<>(readers, recordClass, recordSchema);
}
+ public static ValueReader<?> record(
+ Schema recordSchema, List<Pair<Integer, ValueReader<?>>> readPlan) {
+ return new PlannedRecordReader(recordSchema, readPlan);
+ }
+
+ public static <R extends IndexedRecord> ValueReader<R> record(
+ Schema recordSchema, Class<R> recordClass, List<Pair<Integer,
ValueReader<?>>> readPlan) {
+ return new PlannedIndexedReader<>(recordSchema, recordClass, readPlan);
+ }
+
private static class NullReader implements ValueReader<Object> {
private static final NullReader INSTANCE = new NullReader();
@@ -159,6 +191,47 @@ public class ValueReaders {
decoder.readNull();
return null;
}
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.readNull();
+ }
+ }
+
+ private static class ConstantReader<T> implements ValueReader<T> {
+ private final T constant;
+
+ private ConstantReader(T constant) {
+ this.constant = constant;
+ }
+
+ @Override
+ public T read(Decoder decoder, Object reuse) throws IOException {
+ return constant;
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {}
+ }
+
+ private static class ReplaceWithConstantReader<T> extends ConstantReader<T> {
+ private final ValueReader<?> replaced;
+
+ private ReplaceWithConstantReader(ValueReader<?> replaced, T constant) {
+ super(constant);
+ this.replaced = replaced;
+ }
+
+ @Override
+ public T read(Decoder decoder, Object reuse) throws IOException {
+ replaced.read(decoder, reuse);
+ return super.read(decoder, reuse);
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ replaced.skip(decoder);
+ }
}
private static class BooleanReader implements ValueReader<Boolean> {
@@ -170,6 +243,11 @@ public class ValueReaders {
public Boolean read(Decoder decoder, Object ignored) throws IOException {
return decoder.readBoolean();
}
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.readBoolean();
+ }
}
private static class IntegerReader implements ValueReader<Integer> {
@@ -181,6 +259,27 @@ public class ValueReaders {
public Integer read(Decoder decoder, Object ignored) throws IOException {
return decoder.readInt();
}
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.readInt();
+ }
+ }
+
+ private static class IntegerAsLongReader implements ValueReader<Long> {
+ private static final IntegerAsLongReader INSTANCE = new
IntegerAsLongReader();
+
+ private IntegerAsLongReader() {}
+
+ @Override
+ public Long read(Decoder decoder, Object ignored) throws IOException {
+ return (long) decoder.readInt();
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.readInt();
+ }
}
private static class LongReader implements ValueReader<Long> {
@@ -192,6 +291,11 @@ public class ValueReaders {
public Long read(Decoder decoder, Object ignored) throws IOException {
return decoder.readLong();
}
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.readLong();
+ }
}
private static class FloatReader implements ValueReader<Float> {
@@ -203,6 +307,27 @@ public class ValueReaders {
public Float read(Decoder decoder, Object ignored) throws IOException {
return decoder.readFloat();
}
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.skipFixed(4);
+ }
+ }
+
+ private static class FloatAsDoubleReader implements ValueReader<Double> {
+ private static final FloatAsDoubleReader INSTANCE = new
FloatAsDoubleReader();
+
+ private FloatAsDoubleReader() {}
+
+ @Override
+ public Double read(Decoder decoder, Object ignored) throws IOException {
+ return (double) decoder.readFloat();
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.skipFixed(4);
+ }
}
private static class DoubleReader implements ValueReader<Double> {
@@ -214,6 +339,11 @@ public class ValueReaders {
public Double read(Decoder decoder, Object ignored) throws IOException {
return decoder.readDouble();
}
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.skipFixed(8);
+ }
}
private static class StringReader implements ValueReader<String> {
@@ -231,6 +361,11 @@ public class ValueReaders {
// byte[] bytes = new byte[length];
// decoder.readFixed(bytes, 0, length);
}
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.skipString();
+ }
}
private static class Utf8Reader implements ValueReader<Utf8> {
@@ -250,6 +385,11 @@ public class ValueReaders {
// byte[] bytes = new byte[length];
// decoder.readFixed(bytes, 0, length);
}
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.skipString();
+ }
}
private static class UUIDReader implements ValueReader<UUID> {
@@ -275,6 +415,11 @@ public class ValueReaders {
return UUIDUtil.convert(buffer);
}
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.skipFixed(16);
+ }
}
private static class FixedReader implements ValueReader<byte[]> {
@@ -298,6 +443,11 @@ public class ValueReaders {
decoder.readFixed(bytes, 0, length);
return bytes;
}
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.skipFixed(length);
+ }
}
private static class GenericFixedReader implements
ValueReader<GenericData.Fixed> {
@@ -323,6 +473,11 @@ public class ValueReaders {
decoder.readFixed(bytes, 0, length);
return new GenericData.Fixed(schema, bytes);
}
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.skipFixed(length);
+ }
}
private static class BytesReader implements ValueReader<byte[]> {
@@ -344,6 +499,11 @@ public class ValueReaders {
// decoder.readFixed(bytes, 0, length);
// return bytes;
}
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.skipBytes();
+ }
}
private static class ByteBufferReader implements ValueReader<ByteBuffer> {
@@ -364,6 +524,11 @@ public class ValueReaders {
// decoder.readFixed(bytes, 0, length);
// return bytes;
}
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.skipBytes();
+ }
}
private static class DecimalReader implements ValueReader<BigDecimal> {
@@ -381,6 +546,11 @@ public class ValueReaders {
byte[] bytes = bytesReader.read(decoder, null);
return new BigDecimal(new BigInteger(bytes), scale);
}
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ bytesReader.skip(decoder);
+ }
}
private static class UnionReader implements ValueReader<Object> {
@@ -398,6 +568,12 @@ public class ValueReaders {
int index = decoder.readIndex();
return readers[index].read(decoder, reuse);
}
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ int index = decoder.readIndex();
+ readers[index].skip(decoder);
+ }
}
private static class EnumReader implements ValueReader<String> {
@@ -415,6 +591,11 @@ public class ValueReaders {
int index = decoder.readEnum();
return symbols[index];
}
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.readEnum();
+ }
}
private static class ArrayReader<T> implements ValueReader<Collection<T>> {
@@ -456,6 +637,16 @@ public class ValueReaders {
return resultList;
}
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ long itemsToSkip;
+ while ((itemsToSkip = decoder.skipArray()) != 0) {
+ for (int i = 0; i < itemsToSkip; i += 1) {
+ elementReader.skip(decoder);
+ }
+ }
+ }
}
private static class ArrayMapReader<K, V> implements ValueReader<Map<K, V>> {
@@ -509,6 +700,17 @@ public class ValueReaders {
return resultMap;
}
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ long itemsToSkip;
+ while ((itemsToSkip = decoder.skipArray()) != 0) {
+ for (int i = 0; i < itemsToSkip; i += 1) {
+ keyReader.skip(decoder);
+ valueReader.skip(decoder);
+ }
+ }
+ }
}
private static class MapReader<K, V> implements ValueReader<Map<K, V>> {
@@ -562,6 +764,133 @@ public class ValueReaders {
return resultMap;
}
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ long itemsToSkip;
+ while ((itemsToSkip = decoder.skipMap()) != 0) {
+ for (int i = 0; i < itemsToSkip; i += 1) {
+ keyReader.skip(decoder);
+ valueReader.skip(decoder);
+ }
+ }
+ }
+ }
+
+ public abstract static class PlannedStructReader<S>
+ implements ValueReader<S>, SupportsRowPosition {
+ private final ValueReader<?>[] readers;
+ private final Integer[] positions;
+
+ protected PlannedStructReader(List<Pair<Integer, ValueReader<?>>>
readPlan) {
+ this.readers =
readPlan.stream().map(Pair::second).toArray(ValueReader[]::new);
+ this.positions =
readPlan.stream().map(Pair::first).toArray(Integer[]::new);
+ }
+
+ @Override
+ public void setRowPositionSupplier(Supplier<Long> posSupplier) {
+ for (ValueReader<?> reader : readers) {
+ if (reader instanceof SupportsRowPosition) {
+ ((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier);
+ }
+ }
+ }
+
+ protected abstract S reuseOrCreate(Object reuse);
+
+ protected abstract Object get(S struct, int pos);
+
+ protected abstract void set(S struct, int pos, Object value);
+
+ @Override
+ public S read(Decoder decoder, Object reuse) throws IOException {
+ S struct = reuseOrCreate(reuse);
+
+ for (int i = 0; i < readers.length; i += 1) {
+ if (positions[i] != null) {
+ Object reusedValue = get(struct, positions[i]);
+ set(struct, positions[i], readers[i].read(decoder, reusedValue));
+ } else {
+ // if pos is null, the value is not projected
+ readers[i].skip(decoder);
+ }
+ }
+
+ return struct;
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ for (int i = 0; i < readers.length; i += 1) {
+ readers[i].skip(decoder);
+ }
+ }
+ }
+
+ private static class PlannedRecordReader extends
PlannedStructReader<GenericData.Record> {
+ private final Schema recordSchema;
+
+ private PlannedRecordReader(Schema recordSchema, List<Pair<Integer,
ValueReader<?>>> readPlan) {
+ super(readPlan);
+ this.recordSchema = recordSchema;
+ }
+
+ @Override
+ protected GenericData.Record reuseOrCreate(Object reuse) {
+ if (reuse instanceof GenericData.Record) {
+ return (GenericData.Record) reuse;
+ } else {
+ return new GenericData.Record(recordSchema);
+ }
+ }
+
+ @Override
+ protected Object get(GenericData.Record struct, int pos) {
+ return struct.get(pos);
+ }
+
+ @Override
+ protected void set(GenericData.Record struct, int pos, Object value) {
+ struct.put(pos, value);
+ }
+ }
+
+ private static class PlannedIndexedReader<R extends IndexedRecord>
+ extends PlannedStructReader<R> {
+ private final Class<R> recordClass;
+ private final DynConstructors.Ctor<R> ctor;
+ private final Schema schema;
+
+ PlannedIndexedReader(
+ Schema recordSchema, Class<R> recordClass, List<Pair<Integer,
ValueReader<?>>> readPlan) {
+ super(readPlan);
+ this.recordClass = recordClass;
+ this.ctor =
+ DynConstructors.builder(IndexedRecord.class)
+ .hiddenImpl(recordClass, Schema.class)
+ .hiddenImpl(recordClass)
+ .build();
+ this.schema = recordSchema;
+ }
+
+ @Override
+ protected R reuseOrCreate(Object reuse) {
+ if (recordClass.isInstance(reuse)) {
+ return recordClass.cast(reuse);
+ } else {
+ return ctor.newInstance(schema);
+ }
+ }
+
+ @Override
+ protected Object get(R struct, int pos) {
+ return struct.get(pos);
+ }
+
+ @Override
+ protected void set(R struct, int pos, Object value) {
+ struct.put(pos, value);
+ }
}
public abstract static class StructReader<S> implements ValueReader<S>,
SupportsRowPosition {
@@ -577,10 +906,11 @@ public class ValueReaders {
List<Schema.Field> fields = schema.getFields();
for (int pos = 0; pos < fields.size(); pos += 1) {
Schema.Field field = fields.get(pos);
- if (AvroSchemaUtil.getFieldId(field) ==
MetadataColumns.ROW_POSITION.fieldId()) {
+ if (Objects.equals(AvroSchemaUtil.fieldId(field),
MetadataColumns.ROW_POSITION.fieldId())) {
// track where the _pos field is located for setRowPositionSupplier
this.posField = pos;
- } else if (AvroSchemaUtil.getFieldId(field) ==
MetadataColumns.IS_DELETED.fieldId()) {
+ } else if (Objects.equals(
+ AvroSchemaUtil.fieldId(field),
MetadataColumns.IS_DELETED.fieldId())) {
isDeletedColumnPos = pos;
}
}
@@ -622,19 +952,12 @@ public class ValueReaders {
@Override
public void setRowPositionSupplier(Supplier<Long> posSupplier) {
if (posField >= 0) {
- long startingPos = posSupplier.get();
- this.readers[posField] = new PositionReader(startingPos);
- for (ValueReader<?> reader : readers) {
- if (reader instanceof SupportsRowPosition) {
- ((SupportsRowPosition) reader).setRowPositionSupplier(() ->
startingPos);
- }
- }
+ this.readers[posField] = new PositionReader();
+ }
- } else {
- for (ValueReader<?> reader : readers) {
- if (reader instanceof SupportsRowPosition) {
- ((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier);
- }
+ for (ValueReader<?> reader : readers) {
+ if (reader instanceof SupportsRowPosition) {
+ ((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier);
}
}
}
@@ -739,17 +1062,21 @@ public class ValueReaders {
}
}
- static class PositionReader implements ValueReader<Long> {
- private long currentPosition;
-
- PositionReader(long rowPosition) {
- this.currentPosition = rowPosition - 1;
- }
+ static class PositionReader implements ValueReader<Long>,
SupportsRowPosition {
+ private long currentPosition = 0;
@Override
public Long read(Decoder ignored, Object reuse) throws IOException {
- currentPosition += 1;
+ this.currentPosition += 1;
return currentPosition;
}
+
+ @Override
+ public void skip(Decoder ignored) throws IOException {}
+
+ @Override
+ public void setRowPositionSupplier(Supplier<Long> posSupplier) {
+ this.currentPosition = posSupplier.get() - 1;
+ }
}
}
diff --git
a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java
b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java
index f4c7ee883e..2af098445c 100644
--- a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java
+++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java
@@ -80,10 +80,7 @@ public class TestAvroNameMapping extends
TestAvroReadProjection {
Record projected = writeAndRead(writeSchema, readSchema, record,
nameMapping);
// field id 5 comes from read schema
- Assertions.assertThat(projected.getSchema().getField("location_r5"))
- .as("Field missing from table mapping is renamed")
- .isNotNull();
- Assertions.assertThat(projected.get("location_r5"))
+ Assertions.assertThat(projected.get("location"))
.as("location field should not be read")
.isNull();
Assertions.assertThat(projected.get("id")).isEqualTo(34L);
@@ -105,10 +102,7 @@ public class TestAvroNameMapping extends
TestAvroReadProjection {
projected = writeAndRead(writeSchema, readSchema, record, nameMapping);
Record projectedL1 = ((Map<String, Record>)
projected.get("location")).get("l1");
- Assertions.assertThat(projectedL1.getSchema().getField("long_r2"))
- .as("Field missing from table mapping is renamed")
- .isNotNull();
- Assertions.assertThat(projectedL1.get("long_r2"))
+ Assertions.assertThat(projectedL1.get("long"))
.as("location.value.long, should not be read")
.isNull();
}
@@ -187,8 +181,7 @@ public class TestAvroNameMapping extends
TestAvroReadProjection {
Comparators.charSequences().compare("k2", (CharSequence)
projectedKey.get("k2")))
.isEqualTo(0);
Assertions.assertThat(projectedValue.get("lat")).isEqualTo(52.995143f);
-
Assertions.assertThat(projectedValue.getSchema().getField("long_r2")).isNotNull();
- Assertions.assertThat(projectedValue.get("long_r2")).isNull();
+ Assertions.assertThat(projectedValue.get("long")).isNull();
}
@Test
@@ -249,10 +242,7 @@ public class TestAvroNameMapping extends
TestAvroReadProjection {
Schema readSchema = writeSchema;
Record projected = writeAndRead(writeSchema, readSchema, record,
nameMapping);
- Assertions.assertThat(projected.getSchema().getField("point_r22"))
- .as("Field missing from table mapping is renamed")
- .isNotNull();
- Assertions.assertThat(projected.get("point_r22")).as("point field is not
projected").isNull();
+ Assertions.assertThat(projected.get("point")).as("point is not
projected").isNull();
Assertions.assertThat(projected.get("id")).isEqualTo(34L);
// point array is partially projected
nameMapping =
@@ -269,11 +259,8 @@ public class TestAvroNameMapping extends
TestAvroReadProjection {
projected = writeAndRead(writeSchema, readSchema, record, nameMapping);
Record point = ((List<Record>) projected.get("point")).get(0);
- Assertions.assertThat(point.getSchema().getField("y_r18"))
- .as("Field missing from table mapping is renamed")
- .isNotNull();
Assertions.assertThat(point.get("x")).as("point.x is
projected").isEqualTo(1);
- Assertions.assertThat(point.get("y_r18")).as("point.y is not
projected").isNull();
+ Assertions.assertThat(point.get("y")).as("point.y is not
projected").isNull();
Assertions.assertThat(projected.get("id")).isEqualTo(34L);
}