wmoustafa commented on code in PR #5704:
URL: https://github.com/apache/iceberg/pull/5704#discussion_r979466542


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java:
##########
@@ -285,4 +300,87 @@ protected void set(InternalRow struct, int pos, Object 
value) {
       }
     }
   }
+
+  private static class ComplexUnionReader implements ValueReader<InternalRow> {
+    private static final String UNION_TAG_FIELD_NAME = "tag";
+    private final Schema schema;
+    private final List<Schema> branches;
+    private final ValueReader[] readers;
+    private int nullIndex;
+    private final int[] projectedFieldIdsToIdxInReturnedRow;
+    private boolean isTagFieldProjected;
+    private int numOfFieldsInReturnedRow;
+    private int nullTypeIndex;
+
+    private ComplexUnionReader(Schema schema, List<ValueReader<?>> readers, 
Type expected) {
+      this.schema = schema;
+      this.branches = schema.getTypes();
+      this.readers = new ValueReader[readers.size()];
+      for (int i = 0; i < this.readers.length; i += 1) {
+        this.readers[i] = readers.get(i);
+      }
+
+      // checking if NULL type exists in Avro union schema
+      this.nullTypeIndex = -1;
+      for (int i = 0; i < this.schema.getTypes().size(); i++) {
+        Schema alt = this.schema.getTypes().get(i);
+        if (Objects.equals(alt.getType(), Schema.Type.NULL)) {
+          this.nullTypeIndex = i;
+          break;
+        }
+      }
+
+      // Creating an integer array to track the mapping between the index of 
fields to be projected
+      // and the index of the value for the field stored in the returned row,
+      // if the value for a field equals to -1, it means the value of this 
field should not be
+      // stored
+      // in the returned row
+      int numberOfTypes =
+          this.nullTypeIndex == -1
+              ? this.schema.getTypes().size()
+              : this.schema.getTypes().size() - 1;
+      this.projectedFieldIdsToIdxInReturnedRow = new int[numberOfTypes];
+      Arrays.fill(this.projectedFieldIdsToIdxInReturnedRow, -1);
+      this.numOfFieldsInReturnedRow = 0;
+      this.isTagFieldProjected = false;
+      for (Types.NestedField expectedStructField : 
expected.asStructType().fields()) {
+        String fieldName = expectedStructField.name();
+        if (fieldName.equals(UNION_TAG_FIELD_NAME)) {
+          this.isTagFieldProjected = true;
+          this.numOfFieldsInReturnedRow++;
+          continue;
+        }
+        int projectedFieldIndex = Integer.valueOf(fieldName.substring(5));
+        this.projectedFieldIdsToIdxInReturnedRow[projectedFieldIndex] =
+            this.numOfFieldsInReturnedRow++;
+      }
+    }
+
+    @Override
+    public InternalRow read(Decoder decoder, Object reuse) throws IOException {

Review Comment:
   Should not the logic here be:
   * Iterate on the Avro schema. For each branch, get the field ID from the 
Avro schema annotation. 
   * The assumption is Avro schema union preserves all the union branches even 
if some are not projected. So we still need to figure out if a field is 
projected or not. This can be achieved by looking up the field ID from the step 
above in the expected Iceberg schema. If the field ID is projected, populate 
the InternalRow index using the next suitable reader (hopefully reader order is 
preset properly in `AvroSchemaWithTypeVisitor` to match the expected 
projection).
   * If the field ID is not projected, skip.
   The above logic can be split/refactored between the constructor and the 
`read` method for efficiency.
   @rdblue Let me know if this matches your understanding.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to