rdblue commented on code in PR #5704:
URL: https://github.com/apache/iceberg/pull/5704#discussion_r985309872
##########
core/src/main/java/org/apache/iceberg/avro/PruneColumns.java:
##########
@@ -345,4 +345,27 @@ private static boolean
isOptionSchemaWithNonNullFirstOption(Schema schema) {
return AvroSchemaUtil.isOptionSchema(schema)
&& schema.getTypes().get(0).getType() != Schema.Type.NULL;
}
+
+ // for primitive types, the visitResult will be null, we want to reuse the
primitive types from
+ // the original
+ // schema, while for nested types, we want to use the visitResult because
they have content from
+ // the previous
+ // recursive calls.
+ private static Schema copyUnion(Schema record, List<Schema> visitResults) {
+ List<Schema> branches =
Lists.newArrayListWithExpectedSize(visitResults.size());
+ for (int i = 0; i < visitResults.size(); i++) {
+ if (visitResults.get(i) == null) {
+ branches.add(record.getTypes().get(i));
+ } else {
+ branches.add(visitResults.get(i));
+ }
+ }
+ Schema schema = Schema.createUnion(branches);
+ if (record.getObjectProp(SchemaToType.AVRO_FIELD_NAME_TO_ICEBERG_ID) !=
null) {
Review Comment:
@yiqiangin, @wmoustafa, I would expect this to apply the name mapping, but
instead it passes on a custom schema property. I don't think that this approach
is correct.
The Avro implementation for name mapping is a little odd. For Parquet and
ORC, there's a class that rewrites the schema and adds IDs. It looks like
instead of taking that approach, the Avro implementer added name mapping to
this class. That's okay, but that means that the name mapping should be applied
here for the union work.
We want to create guarantees that we can rely on to simplify other code. In
this case, once `PruneColumns` is done, we're guaranteed to have an Avro schema
with the correct field IDs annotated throughout.
To do that, I think the field ID should be added to each Schema that is a
branch of the union:
```java
List<Schema> unionTypes = union.getTypes();
for (int ind = 0; ind < branches.size(); ind += 1) {
Schema branchSchema = visitResults.get(ind);
if (branchSchema == null) {
branchSchema = unionTypes.get(ind);
}
Integer branchId = AvroSchemaUtil.getBranchId(branchSchema, nameMapping,
fieldNames());
if (branchId != null) {
optionSchema.addProp(AvroSchemaUtil.BRANCH_ID_PROP,
String.valueOf(branchId));
}
branches.add(branchSchema)
}
return Schema.createUnion(branches);
// AvroSchemaUtil additions:
public static final String BRANCH_ID_PROP = "branch-id";
static Integer getBranchId(
Schema branch, NameMapping mapping, Iterable<String> parentFieldNames)
{
Object id = branch.getObjectProp(BRANCH_ID_PROP);
if (id != null) {
return toInt(id);
} else if (mapping != null) {
MappedField mappedField = findInMapping(mapping, parentFieldNames,
branch.getName(), branch.getFullName());
if (mappedField != null) {
return mappedField.id();
}
}
return null;
}
private static MappedField findInMapping(NameMapping mapping,
Iterable<String> parentFieldNames, String... nameOpts) {
List<String> names = Lists.newArrayList(parentFieldNames);
for (String name : nameOpts) {
names.add(name);
MappedField field = mapping.find(name);
if (field != null) {
return field;
}
}
return null;
}
```
##########
core/src/main/java/org/apache/iceberg/avro/ProjectionDatumReader.java:
##########
@@ -60,6 +60,7 @@ public void setRowPositionSupplier(Supplier<Long>
posSupplier) {
@Override
public void setSchema(Schema newFileSchema) {
this.fileSchema = newFileSchema;
+ AvroSchemaUtil.convertToDeriveNameMapping(this.fileSchema);
if (nameMapping == null && !AvroSchemaUtil.hasIds(fileSchema)) {
nameMapping = MappingUtil.create(expectedSchema);
Review Comment:
Yes, exactly. See the code I posted above. I think we need to add
`branch-id` to the union branches during pruning.
##########
core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java:
##########
@@ -82,11 +83,45 @@ private static <T> T visitRecord(
private static <T> T visitUnion(Type type, Schema union,
AvroSchemaWithTypeVisitor<T> visitor) {
List<Schema> types = union.getTypes();
List<T> options = Lists.newArrayListWithExpectedSize(types.size());
- for (Schema branch : types) {
- if (branch.getType() == Schema.Type.NULL) {
- options.add(visit((Type) null, branch, visitor));
- } else {
- options.add(visit(type, branch, visitor));
+
+ // simple union case
+ if (AvroSchemaUtil.isOptionSchema(union)) {
+ for (Schema branch : types) {
+ if (branch.getType() == Schema.Type.NULL) {
+ options.add(visit((Type) null, branch, visitor));
+ } else {
+ options.add(visit(type, branch, visitor));
+ }
+ }
+ } else { // complex union case
+ Preconditions.checkArgument(
+ type instanceof Types.StructType,
+ "Cannot visit invalid Iceberg type: %s for Avro complex union type:
%s",
+ type,
+ union);
+ Map<String, Integer> fieldNameToId =
+ (Map)
union.getObjectProp(SchemaToType.AVRO_FIELD_NAME_TO_ICEBERG_ID);
Review Comment:
This matching should be done using branch IDs, not a map like this.
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java:
##########
@@ -285,4 +300,87 @@ protected void set(InternalRow struct, int pos, Object
value) {
}
}
}
+
+ private static class ComplexUnionReader implements ValueReader<InternalRow> {
Review Comment:
What is specific to Spark about this? Can we use an approach like the struct
reader and have a generic one that is extended by Spark, Flink, etc. to make
the type concrete?
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java:
##########
@@ -24,13 +24,17 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import org.apache.avro.Schema;
Review Comment:
I'd prefer not passing in the Avro schema. I think that the behavior should
be that the `AvroSchemaWithTypeVisitor` visits each union branch and produces a
`ValueReader`. Then the visitor implementation should create the index map and
pass it into the reader. Not passing the schema in should keep the reader
simple.
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java:
##########
@@ -285,4 +300,87 @@ protected void set(InternalRow struct, int pos, Object
value) {
}
}
}
+
+ private static class ComplexUnionReader implements ValueReader<InternalRow> {
+ private static final String UNION_TAG_FIELD_NAME = "tag";
+ private final Schema schema;
+ private final List<Schema> branches;
+ private final ValueReader[] readers;
+ private int nullIndex;
+ private final int[] projectedFieldIdsToIdxInReturnedRow;
+ private boolean isTagFieldProjected;
+ private int numOfFieldsInReturnedRow;
+ private int nullTypeIndex;
+
+ private ComplexUnionReader(Schema schema, List<ValueReader<?>> readers,
Type expected) {
+ this.schema = schema;
+ this.branches = schema.getTypes();
+ this.readers = new ValueReader[readers.size()];
+ for (int i = 0; i < this.readers.length; i += 1) {
+ this.readers[i] = readers.get(i);
+ }
+
+ // checking if NULL type exists in Avro union schema
+ this.nullTypeIndex = -1;
Review Comment:
If there is an index for null, then it should be handled just like any other
value reader, right? It won't be projected, but if the union has the null
index, the reader can be called and will do nothing.
I guess the odd thing is that there isn't a `NullValueReader` that can be
used as a placeholder? I think maybe adding one would be cleaner than adding
special handling for null options.
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java:
##########
@@ -285,4 +300,87 @@ protected void set(InternalRow struct, int pos, Object
value) {
}
}
}
+
+ private static class ComplexUnionReader implements ValueReader<InternalRow> {
+ private static final String UNION_TAG_FIELD_NAME = "tag";
+ private final Schema schema;
+ private final List<Schema> branches;
+ private final ValueReader[] readers;
+ private int nullIndex;
+ private final int[] projectedFieldIdsToIdxInReturnedRow;
+ private boolean isTagFieldProjected;
+ private int numOfFieldsInReturnedRow;
+ private int nullTypeIndex;
+
+ private ComplexUnionReader(Schema schema, List<ValueReader<?>> readers,
Type expected) {
+ this.schema = schema;
+ this.branches = schema.getTypes();
+ this.readers = new ValueReader[readers.size()];
+ for (int i = 0; i < this.readers.length; i += 1) {
+ this.readers[i] = readers.get(i);
+ }
+
+ // checking if NULL type exists in Avro union schema
+ this.nullTypeIndex = -1;
+ for (int i = 0; i < this.schema.getTypes().size(); i++) {
+ Schema alt = this.schema.getTypes().get(i);
+ if (Objects.equals(alt.getType(), Schema.Type.NULL)) {
+ this.nullTypeIndex = i;
+ break;
+ }
+ }
+
+ // Creating an integer array to track the mapping between the index of
fields to be projected
+ // and the index of the value for the field stored in the returned row,
+ // if the value for a field equals to -1, it means the value of this
field should not be
+ // stored
+ // in the returned row
+ int numberOfTypes =
+ this.nullTypeIndex == -1
+ ? this.schema.getTypes().size()
+ : this.schema.getTypes().size() - 1;
+ this.projectedFieldIdsToIdxInReturnedRow = new int[numberOfTypes];
+ Arrays.fill(this.projectedFieldIdsToIdxInReturnedRow, -1);
+ this.numOfFieldsInReturnedRow = 0;
+ this.isTagFieldProjected = false;
+ for (Types.NestedField expectedStructField :
expected.asStructType().fields()) {
+ String fieldName = expectedStructField.name();
+ if (fieldName.equals(UNION_TAG_FIELD_NAME)) {
Review Comment:
Is there a better way to identify the tag field?
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java:
##########
@@ -285,4 +300,87 @@ protected void set(InternalRow struct, int pos, Object
value) {
}
}
}
+
+ private static class ComplexUnionReader implements ValueReader<InternalRow> {
+ private static final String UNION_TAG_FIELD_NAME = "tag";
+ private final Schema schema;
+ private final List<Schema> branches;
+ private final ValueReader[] readers;
+ private int nullIndex;
+ private final int[] projectedFieldIdsToIdxInReturnedRow;
+ private boolean isTagFieldProjected;
+ private int numOfFieldsInReturnedRow;
+ private int nullTypeIndex;
+
+ private ComplexUnionReader(Schema schema, List<ValueReader<?>> readers,
Type expected) {
+ this.schema = schema;
+ this.branches = schema.getTypes();
+ this.readers = new ValueReader[readers.size()];
+ for (int i = 0; i < this.readers.length; i += 1) {
+ this.readers[i] = readers.get(i);
+ }
+
+ // checking if NULL type exists in Avro union schema
+ this.nullTypeIndex = -1;
+ for (int i = 0; i < this.schema.getTypes().size(); i++) {
+ Schema alt = this.schema.getTypes().get(i);
+ if (Objects.equals(alt.getType(), Schema.Type.NULL)) {
+ this.nullTypeIndex = i;
+ break;
+ }
+ }
+
+ // Creating an integer array to track the mapping between the index of
fields to be projected
+ // and the index of the value for the field stored in the returned row,
+ // if the value for a field equals to -1, it means the value of this
field should not be
+ // stored
+ // in the returned row
+ int numberOfTypes =
+ this.nullTypeIndex == -1
+ ? this.schema.getTypes().size()
+ : this.schema.getTypes().size() - 1;
+ this.projectedFieldIdsToIdxInReturnedRow = new int[numberOfTypes];
+ Arrays.fill(this.projectedFieldIdsToIdxInReturnedRow, -1);
+ this.numOfFieldsInReturnedRow = 0;
+ this.isTagFieldProjected = false;
+ for (Types.NestedField expectedStructField :
expected.asStructType().fields()) {
+ String fieldName = expectedStructField.name();
+ if (fieldName.equals(UNION_TAG_FIELD_NAME)) {
+ this.isTagFieldProjected = true;
+ this.numOfFieldsInReturnedRow++;
+ continue;
Review Comment:
Minor: prefer `else` to `continue` when the logic is simple like this.
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java:
##########
@@ -285,4 +300,87 @@ protected void set(InternalRow struct, int pos, Object
value) {
}
}
}
+
+ private static class ComplexUnionReader implements ValueReader<InternalRow> {
+ private static final String UNION_TAG_FIELD_NAME = "tag";
+ private final Schema schema;
+ private final List<Schema> branches;
+ private final ValueReader[] readers;
+ private int nullIndex;
+ private final int[] projectedFieldIdsToIdxInReturnedRow;
+ private boolean isTagFieldProjected;
+ private int numOfFieldsInReturnedRow;
+ private int nullTypeIndex;
+
+ private ComplexUnionReader(Schema schema, List<ValueReader<?>> readers,
Type expected) {
+ this.schema = schema;
+ this.branches = schema.getTypes();
+ this.readers = new ValueReader[readers.size()];
+ for (int i = 0; i < this.readers.length; i += 1) {
+ this.readers[i] = readers.get(i);
+ }
+
+ // checking if NULL type exists in Avro union schema
+ this.nullTypeIndex = -1;
+ for (int i = 0; i < this.schema.getTypes().size(); i++) {
+ Schema alt = this.schema.getTypes().get(i);
+ if (Objects.equals(alt.getType(), Schema.Type.NULL)) {
+ this.nullTypeIndex = i;
+ break;
+ }
+ }
+
+ // Creating an integer array to track the mapping between the index of
fields to be projected
+ // and the index of the value for the field stored in the returned row,
+ // if the value for a field equals to -1, it means the value of this
field should not be
+ // stored
+ // in the returned row
+ int numberOfTypes =
+ this.nullTypeIndex == -1
+ ? this.schema.getTypes().size()
+ : this.schema.getTypes().size() - 1;
+ this.projectedFieldIdsToIdxInReturnedRow = new int[numberOfTypes];
+ Arrays.fill(this.projectedFieldIdsToIdxInReturnedRow, -1);
+ this.numOfFieldsInReturnedRow = 0;
+ this.isTagFieldProjected = false;
+ for (Types.NestedField expectedStructField :
expected.asStructType().fields()) {
+ String fieldName = expectedStructField.name();
+ if (fieldName.equals(UNION_TAG_FIELD_NAME)) {
+ this.isTagFieldProjected = true;
+ this.numOfFieldsInReturnedRow++;
+ continue;
+ }
+ int projectedFieldIndex = Integer.valueOf(fieldName.substring(5));
Review Comment:
This should not parse field names. It should instead use field IDs from the
Iceberg schema and branch IDs from the Avro schema.
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java:
##########
@@ -285,4 +300,87 @@ protected void set(InternalRow struct, int pos, Object
value) {
}
}
}
+
+ private static class ComplexUnionReader implements ValueReader<InternalRow> {
+ private static final String UNION_TAG_FIELD_NAME = "tag";
+ private final Schema schema;
+ private final List<Schema> branches;
+ private final ValueReader[] readers;
+ private int nullIndex;
+ private final int[] projectedFieldIdsToIdxInReturnedRow;
+ private boolean isTagFieldProjected;
+ private int numOfFieldsInReturnedRow;
+ private int nullTypeIndex;
+
+ private ComplexUnionReader(Schema schema, List<ValueReader<?>> readers,
Type expected) {
+ this.schema = schema;
+ this.branches = schema.getTypes();
+ this.readers = new ValueReader[readers.size()];
+ for (int i = 0; i < this.readers.length; i += 1) {
+ this.readers[i] = readers.get(i);
+ }
+
+ // checking if NULL type exists in Avro union schema
+ this.nullTypeIndex = -1;
+ for (int i = 0; i < this.schema.getTypes().size(); i++) {
+ Schema alt = this.schema.getTypes().get(i);
+ if (Objects.equals(alt.getType(), Schema.Type.NULL)) {
+ this.nullTypeIndex = i;
+ break;
+ }
+ }
+
+ // Creating an integer array to track the mapping between the index of
fields to be projected
+ // and the index of the value for the field stored in the returned row,
+ // if the value for a field equals to -1, it means the value of this
field should not be
+ // stored
+ // in the returned row
+ int numberOfTypes =
+ this.nullTypeIndex == -1
+ ? this.schema.getTypes().size()
+ : this.schema.getTypes().size() - 1;
+ this.projectedFieldIdsToIdxInReturnedRow = new int[numberOfTypes];
+ Arrays.fill(this.projectedFieldIdsToIdxInReturnedRow, -1);
+ this.numOfFieldsInReturnedRow = 0;
+ this.isTagFieldProjected = false;
+ for (Types.NestedField expectedStructField :
expected.asStructType().fields()) {
+ String fieldName = expectedStructField.name();
+ if (fieldName.equals(UNION_TAG_FIELD_NAME)) {
+ this.isTagFieldProjected = true;
+ this.numOfFieldsInReturnedRow++;
+ continue;
+ }
+ int projectedFieldIndex = Integer.valueOf(fieldName.substring(5));
+ this.projectedFieldIdsToIdxInReturnedRow[projectedFieldIndex] =
+ this.numOfFieldsInReturnedRow++;
+ }
+ }
+
+ @Override
+ public InternalRow read(Decoder decoder, Object reuse) throws IOException {
+ int index = decoder.readIndex();
+ if (index == nullTypeIndex) {
+ // if it is a null data, directly return null as the whole union result
+ // we know for sure it is a null so the casting will always work.
+ return (InternalRow) readers[nullTypeIndex].read(decoder, reuse);
Review Comment:
I assume that this always returns null, but it is really weird to return the
result of a reader directly.
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java:
##########
@@ -285,4 +300,87 @@ protected void set(InternalRow struct, int pos, Object
value) {
}
}
}
+
+ private static class ComplexUnionReader implements ValueReader<InternalRow> {
+ private static final String UNION_TAG_FIELD_NAME = "tag";
+ private final Schema schema;
+ private final List<Schema> branches;
+ private final ValueReader[] readers;
+ private int nullIndex;
+ private final int[] projectedFieldIdsToIdxInReturnedRow;
+ private boolean isTagFieldProjected;
+ private int numOfFieldsInReturnedRow;
+ private int nullTypeIndex;
+
+ private ComplexUnionReader(Schema schema, List<ValueReader<?>> readers,
Type expected) {
+ this.schema = schema;
+ this.branches = schema.getTypes();
+ this.readers = new ValueReader[readers.size()];
+ for (int i = 0; i < this.readers.length; i += 1) {
+ this.readers[i] = readers.get(i);
+ }
+
+ // checking if NULL type exists in Avro union schema
+ this.nullTypeIndex = -1;
+ for (int i = 0; i < this.schema.getTypes().size(); i++) {
+ Schema alt = this.schema.getTypes().get(i);
+ if (Objects.equals(alt.getType(), Schema.Type.NULL)) {
+ this.nullTypeIndex = i;
+ break;
+ }
+ }
+
+ // Creating an integer array to track the mapping between the index of
fields to be projected
+ // and the index of the value for the field stored in the returned row,
+ // if the value for a field equals to -1, it means the value of this
field should not be
+ // stored
+ // in the returned row
+ int numberOfTypes =
+ this.nullTypeIndex == -1
+ ? this.schema.getTypes().size()
+ : this.schema.getTypes().size() - 1;
+ this.projectedFieldIdsToIdxInReturnedRow = new int[numberOfTypes];
+ Arrays.fill(this.projectedFieldIdsToIdxInReturnedRow, -1);
+ this.numOfFieldsInReturnedRow = 0;
+ this.isTagFieldProjected = false;
+ for (Types.NestedField expectedStructField :
expected.asStructType().fields()) {
+ String fieldName = expectedStructField.name();
+ if (fieldName.equals(UNION_TAG_FIELD_NAME)) {
+ this.isTagFieldProjected = true;
+ this.numOfFieldsInReturnedRow++;
+ continue;
+ }
+ int projectedFieldIndex = Integer.valueOf(fieldName.substring(5));
+ this.projectedFieldIdsToIdxInReturnedRow[projectedFieldIndex] =
+ this.numOfFieldsInReturnedRow++;
+ }
+ }
+
+ @Override
+ public InternalRow read(Decoder decoder, Object reuse) throws IOException {
+ int index = decoder.readIndex();
+ if (index == nullTypeIndex) {
+ // if it is a null data, directly return null as the whole union result
+ // we know for sure it is a null so the casting will always work.
+ return (InternalRow) readers[nullTypeIndex].read(decoder, reuse);
Review Comment:
I assume that this always returns null, but it is really weird to return the
result of a reader directly.
What if the tag was projected? Why does this not produce
`InternalRow(nullIndex, null, null, ... null)`?
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java:
##########
@@ -285,4 +300,87 @@ protected void set(InternalRow struct, int pos, Object
value) {
}
}
}
+
+ private static class ComplexUnionReader implements ValueReader<InternalRow> {
+ private static final String UNION_TAG_FIELD_NAME = "tag";
+ private final Schema schema;
+ private final List<Schema> branches;
+ private final ValueReader[] readers;
+ private int nullIndex;
+ private final int[] projectedFieldIdsToIdxInReturnedRow;
+ private boolean isTagFieldProjected;
+ private int numOfFieldsInReturnedRow;
+ private int nullTypeIndex;
+
+ private ComplexUnionReader(Schema schema, List<ValueReader<?>> readers,
Type expected) {
+ this.schema = schema;
+ this.branches = schema.getTypes();
+ this.readers = new ValueReader[readers.size()];
+ for (int i = 0; i < this.readers.length; i += 1) {
+ this.readers[i] = readers.get(i);
+ }
+
+ // checking if NULL type exists in Avro union schema
+ this.nullTypeIndex = -1;
+ for (int i = 0; i < this.schema.getTypes().size(); i++) {
+ Schema alt = this.schema.getTypes().get(i);
+ if (Objects.equals(alt.getType(), Schema.Type.NULL)) {
+ this.nullTypeIndex = i;
+ break;
+ }
+ }
+
+ // Creating an integer array to track the mapping between the index of
fields to be projected
+ // and the index of the value for the field stored in the returned row,
+ // if the value for a field equals to -1, it means the value of this
field should not be
+ // stored
+ // in the returned row
+ int numberOfTypes =
+ this.nullTypeIndex == -1
+ ? this.schema.getTypes().size()
+ : this.schema.getTypes().size() - 1;
+ this.projectedFieldIdsToIdxInReturnedRow = new int[numberOfTypes];
+ Arrays.fill(this.projectedFieldIdsToIdxInReturnedRow, -1);
+ this.numOfFieldsInReturnedRow = 0;
+ this.isTagFieldProjected = false;
+ for (Types.NestedField expectedStructField :
expected.asStructType().fields()) {
+ String fieldName = expectedStructField.name();
+ if (fieldName.equals(UNION_TAG_FIELD_NAME)) {
+ this.isTagFieldProjected = true;
+ this.numOfFieldsInReturnedRow++;
+ continue;
+ }
+ int projectedFieldIndex = Integer.valueOf(fieldName.substring(5));
+ this.projectedFieldIdsToIdxInReturnedRow[projectedFieldIndex] =
+ this.numOfFieldsInReturnedRow++;
+ }
+ }
+
+ @Override
+ public InternalRow read(Decoder decoder, Object reuse) throws IOException {
+ int index = decoder.readIndex();
+ if (index == nullTypeIndex) {
+ // if it is a null data, directly return null as the whole union result
+ // we know for sure it is a null so the casting will always work.
+ return (InternalRow) readers[nullTypeIndex].read(decoder, reuse);
+ }
+
+ // otherwise, we need to return an InternalRow as a struct data
+ InternalRow struct = new GenericInternalRow(numOfFieldsInReturnedRow);
Review Comment:
Readers need to support an option to reuse the row. You can see how in the
struct reader.
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java:
##########
@@ -285,4 +300,87 @@ protected void set(InternalRow struct, int pos, Object
value) {
}
}
}
+
+ private static class ComplexUnionReader implements ValueReader<InternalRow> {
+ private static final String UNION_TAG_FIELD_NAME = "tag";
+ private final Schema schema;
+ private final List<Schema> branches;
+ private final ValueReader[] readers;
+ private int nullIndex;
+ private final int[] projectedFieldIdsToIdxInReturnedRow;
+ private boolean isTagFieldProjected;
+ private int numOfFieldsInReturnedRow;
+ private int nullTypeIndex;
+
+ private ComplexUnionReader(Schema schema, List<ValueReader<?>> readers,
Type expected) {
+ this.schema = schema;
+ this.branches = schema.getTypes();
+ this.readers = new ValueReader[readers.size()];
+ for (int i = 0; i < this.readers.length; i += 1) {
+ this.readers[i] = readers.get(i);
+ }
+
+ // checking if NULL type exists in Avro union schema
+ this.nullTypeIndex = -1;
+ for (int i = 0; i < this.schema.getTypes().size(); i++) {
+ Schema alt = this.schema.getTypes().get(i);
+ if (Objects.equals(alt.getType(), Schema.Type.NULL)) {
+ this.nullTypeIndex = i;
+ break;
+ }
+ }
+
+ // Creating an integer array to track the mapping between the index of
fields to be projected
+ // and the index of the value for the field stored in the returned row,
+ // if the value for a field equals to -1, it means the value of this
field should not be
+ // stored
+ // in the returned row
+ int numberOfTypes =
+ this.nullTypeIndex == -1
+ ? this.schema.getTypes().size()
+ : this.schema.getTypes().size() - 1;
+ this.projectedFieldIdsToIdxInReturnedRow = new int[numberOfTypes];
+ Arrays.fill(this.projectedFieldIdsToIdxInReturnedRow, -1);
+ this.numOfFieldsInReturnedRow = 0;
+ this.isTagFieldProjected = false;
+ for (Types.NestedField expectedStructField :
expected.asStructType().fields()) {
+ String fieldName = expectedStructField.name();
+ if (fieldName.equals(UNION_TAG_FIELD_NAME)) {
+ this.isTagFieldProjected = true;
+ this.numOfFieldsInReturnedRow++;
+ continue;
+ }
+ int projectedFieldIndex = Integer.valueOf(fieldName.substring(5));
+ this.projectedFieldIdsToIdxInReturnedRow[projectedFieldIndex] =
+ this.numOfFieldsInReturnedRow++;
+ }
+ }
+
+ @Override
+ public InternalRow read(Decoder decoder, Object reuse) throws IOException {
Review Comment:
I agree with @wmoustafa, although I think that this is correct to make the
mapping array. That way the implementation is straightforward:
```java
InternalRow row = reuseOrCreate(reuse); // this is where setNullAt happens
int index = decoder.readIndex();
int destIndex = projectionIndexes[index];
if (destIndex >= 0) {
Object value = readers[index].read(decoder, get(reuse, destIndex));
row.update(destIndex, value);
} else {
readers[index].read(decoder, null);
}
return row;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]