wmoustafa commented on code in PR #5704:
URL: https://github.com/apache/iceberg/pull/5704#discussion_r979466542
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java:
##########
@@ -285,4 +300,87 @@ protected void set(InternalRow struct, int pos, Object
value) {
}
}
}
+
+ private static class ComplexUnionReader implements ValueReader<InternalRow> {
+ private static final String UNION_TAG_FIELD_NAME = "tag";
+ private final Schema schema;
+ private final List<Schema> branches;
+ private final ValueReader[] readers;
+ private int nullIndex;
+ private final int[] projectedFieldIdsToIdxInReturnedRow;
+ private boolean isTagFieldProjected;
+ private int numOfFieldsInReturnedRow;
+ private int nullTypeIndex;
+
+ private ComplexUnionReader(Schema schema, List<ValueReader<?>> readers,
Type expected) {
+ this.schema = schema;
+ this.branches = schema.getTypes();
+ this.readers = new ValueReader[readers.size()];
+ for (int i = 0; i < this.readers.length; i += 1) {
+ this.readers[i] = readers.get(i);
+ }
+
+ // checking if NULL type exists in Avro union schema
+ this.nullTypeIndex = -1;
+ for (int i = 0; i < this.schema.getTypes().size(); i++) {
+ Schema alt = this.schema.getTypes().get(i);
+ if (Objects.equals(alt.getType(), Schema.Type.NULL)) {
+ this.nullTypeIndex = i;
+ break;
+ }
+ }
+
+ // Creating an integer array to track the mapping between the index of
fields to be projected
+ // and the index of the value for the field stored in the returned row,
+ // if the value for a field equals to -1, it means the value of this
field should not be
+ // stored
+ // in the returned row
+ int numberOfTypes =
+ this.nullTypeIndex == -1
+ ? this.schema.getTypes().size()
+ : this.schema.getTypes().size() - 1;
+ this.projectedFieldIdsToIdxInReturnedRow = new int[numberOfTypes];
+ Arrays.fill(this.projectedFieldIdsToIdxInReturnedRow, -1);
+ this.numOfFieldsInReturnedRow = 0;
+ this.isTagFieldProjected = false;
+ for (Types.NestedField expectedStructField :
expected.asStructType().fields()) {
+ String fieldName = expectedStructField.name();
+ if (fieldName.equals(UNION_TAG_FIELD_NAME)) {
+ this.isTagFieldProjected = true;
+ this.numOfFieldsInReturnedRow++;
+ continue;
+ }
+ int projectedFieldIndex = Integer.valueOf(fieldName.substring(5));
+ this.projectedFieldIdsToIdxInReturnedRow[projectedFieldIndex] =
+ this.numOfFieldsInReturnedRow++;
+ }
+ }
+
+ @Override
+ public InternalRow read(Decoder decoder, Object reuse) throws IOException {
Review Comment:
Should not the logic here be:
* Iterate on the Avro schema. For each branch, get the field ID from the
Avro schema annotation.
* The assumption is Avro schema union preserves all the union branches even
if some are not projected. So we still need to figure out if a field is
projected or not. This can be achieved by looking up the field ID from the step
above in the expected Iceberg schema. If the field ID is projected, populate
the InternalRow index using the next suitable reader (hopefully reader order is
preset properly in `AvroSchemaWithTypeVisitor` to match the expected
projection).
* If the field ID is not projected, skip.
The above logic can be split/refactored between the constructor and the
`read` method for efficiency.
@rdblue Let me know if this matches your understanding.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]