yiqiangin commented on code in PR #5704:
URL: https://github.com/apache/iceberg/pull/5704#discussion_r990470991
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java:
##########
@@ -285,4 +300,87 @@ protected void set(InternalRow struct, int pos, Object
value) {
}
}
}
+
+ private static class ComplexUnionReader implements ValueReader<InternalRow> {
+ private static final String UNION_TAG_FIELD_NAME = "tag";
+ private final Schema schema;
+ private final List<Schema> branches;
+ private final ValueReader[] readers;
+ private int nullIndex;
+ private final int[] projectedFieldIdsToIdxInReturnedRow;
+ private boolean isTagFieldProjected;
+ private int numOfFieldsInReturnedRow;
+ private int nullTypeIndex;
+
+ private ComplexUnionReader(Schema schema, List<ValueReader<?>> readers,
Type expected) {
+ this.schema = schema;
+ this.branches = schema.getTypes();
+ this.readers = new ValueReader[readers.size()];
+ for (int i = 0; i < this.readers.length; i += 1) {
+ this.readers[i] = readers.get(i);
+ }
+
+ // checking if NULL type exists in Avro union schema
+ this.nullTypeIndex = -1;
+ for (int i = 0; i < this.schema.getTypes().size(); i++) {
+ Schema alt = this.schema.getTypes().get(i);
+ if (Objects.equals(alt.getType(), Schema.Type.NULL)) {
+ this.nullTypeIndex = i;
+ break;
+ }
+ }
+
+ // Creating an integer array to track the mapping between the index of
fields to be projected
+ // and the index of the value for the field stored in the returned row,
+ // if the value for a field equals to -1, it means the value of this
field should not be
+ // stored
+ // in the returned row
+ int numberOfTypes =
+ this.nullTypeIndex == -1
+ ? this.schema.getTypes().size()
+ : this.schema.getTypes().size() - 1;
+ this.projectedFieldIdsToIdxInReturnedRow = new int[numberOfTypes];
+ Arrays.fill(this.projectedFieldIdsToIdxInReturnedRow, -1);
+ this.numOfFieldsInReturnedRow = 0;
+ this.isTagFieldProjected = false;
+ for (Types.NestedField expectedStructField :
expected.asStructType().fields()) {
+ String fieldName = expectedStructField.name();
+ if (fieldName.equals(UNION_TAG_FIELD_NAME)) {
+ this.isTagFieldProjected = true;
+ this.numOfFieldsInReturnedRow++;
+ continue;
+ }
+ int projectedFieldIndex = Integer.valueOf(fieldName.substring(5));
+ this.projectedFieldIdsToIdxInReturnedRow[projectedFieldIndex] =
+ this.numOfFieldsInReturnedRow++;
+ }
+ }
+
+ @Override
+ public InternalRow read(Decoder decoder, Object reuse) throws IOException {
Review Comment:
Per Ryan's suggestion, Avro Schema is not passed into ComplexUnionReader.
The approach of mapping array is used to track the relationship between a
branch type and the position of its value in the returned row.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]