shardulm94 commented on a change in pull request #4242:
URL: https://github.com/apache/iceberg/pull/4242#discussion_r827529615



##########
File path: core/src/main/java/org/apache/iceberg/avro/AvroSchemaVisitor.java
##########
@@ -52,8 +52,21 @@
       case UNION:
         List<Schema> types = schema.getTypes();
         List<T> options = Lists.newArrayListWithExpectedSize(types.size());
-        for (Schema type : types) {
-          options.add(visit(type, visitor));
+        if (AvroSchemaUtil.isOptionSchema(schema)) {
+          for (Schema type : types) {
+            options.add(visit(type, visitor));
+          }
+        } else {
+          // complex union case

Review comment:
       Seems like we are implementing business logic in the visitor. Ideally 
the visitor should just handle traversing the Schema tree and leave business 
logic to the implementation. e.g. here converting union to struct and assigning 
field names like "field${i}" should be handled in the implementation class's 
`union` method.
   
   On a side note, I removed this piece of code and `TestUnionSchemaConversion` 
ran just fine. Is this code really needed?
   

##########
File path: core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java
##########
@@ -154,6 +154,27 @@ public static boolean isOptionSchema(Schema schema) {
     return false;
   }
 
+  /**
+   * This method decides whether a schema is of type union and is complex 
union and is optional
+   *
+   * Complex union: the number of options in union not equals to 2
+   * Optional: null is present in union
+   *
+   * @param schema input schema
+   * @return true if schema is complex union and it is optional
+   */
+  public static boolean isOptionalComplexUnion(Schema schema) {
+    if (schema.getType() == UNION && schema.getTypes().size() != 2) {

Review comment:
       What happens is it receives a schema like ["null"]. This is regarded as 
an optional complex union by this function. Is this expected? Should we instead 
check for `schema.getTypes().size() > 2`. What is the impact of this return 
true on the readers?

##########
File path: 
core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java
##########
@@ -79,11 +79,25 @@
   private static <T> T visitUnion(Type type, Schema union, 
AvroSchemaWithTypeVisitor<T> visitor) {
     List<Schema> types = union.getTypes();
     List<T> options = Lists.newArrayListWithExpectedSize(types.size());
-    for (Schema branch : types) {
-      if (branch.getType() == Schema.Type.NULL) {
-        options.add(visit((Type) null, branch, visitor));
-      } else {
-        options.add(visit(type, branch, visitor));
+
+    // simple union case
+    if (AvroSchemaUtil.isOptionSchema(union)) {
+      for (Schema branch : types) {
+        if (branch.getType() == Schema.Type.NULL) {
+          options.add(visit((Type) null, branch, visitor));
+        } else {
+          options.add(visit(type, branch, visitor));
+        }
+      }
+    } else { // complex union case
+      int index = 1;
+      for (Schema branch : types) {
+        if (branch.getType() == Schema.Type.NULL) {
+          options.add(visit((Type) null, branch, visitor));
+        } else {
+          options.add(visit(type.asStructType().fields().get(index).type(), 
branch, visitor));

Review comment:
       Similar comment to `AvroSchemaVisitor` with business logic being 
introduced in the visitor.

##########
File path: core/src/main/java/org/apache/iceberg/avro/PruneColumns.java
##########
@@ -323,4 +325,19 @@ private static Schema makeEmptyCopy(Schema field) {
   private static boolean isOptionSchemaWithNonNullFirstOption(Schema schema) {
     return AvroSchemaUtil.isOptionSchema(schema) && 
schema.getTypes().get(0).getType() != Schema.Type.NULL;
   }
+
+  // for primitive types, the visitResult will be null, we want to reuse the 
primitive types from the original
+  // schema, while for nested types, we want to use the visitResult because 
they have content from the previous
+  // recursive calls.
+  private static Schema copyUnion(Schema record, List<Schema> visitResults) {

Review comment:
       It does not look we prune any columns in the complex union case. I tried 
modifying `TestSparkAvroUnions` to test projecting a single field from the 
complex union and it fails (error below). There are a couple of ways we can go 
about column pruning:
   1. We should implement column pruning for complex unions. It should be okay 
to implement this as a followup PR, but in that case I think we should throw a 
better error message letting the user known that projecting fields within a 
complex union is not supported for now.
   2. We can choose not to prune columns inside complex unions and rather let 
the engine handle the pruning. E.g. you can see a comment in `map()` => `// 
right now, maps can't be selected without values`. The `ReadBuilder`s might 
need to be handled to account for this change.
   
   ```
   index (1) must be less than size (1)
   java.lang.IndexOutOfBoundsException: index (1) must be less than size (1)
        at 
org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:1355)
        at 
org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:1337)
        at 
org.apache.iceberg.relocated.com.google.common.collect.SingletonImmutableList.get(SingletonImmutableList.java:44)
        at 
org.apache.iceberg.avro.AvroSchemaWithTypeVisitor.visitUnion(AvroSchemaWithTypeVisitor.java:98)
        at 
org.apache.iceberg.avro.AvroSchemaWithTypeVisitor.visit(AvroSchemaWithTypeVisitor.java:41)
        at 
org.apache.iceberg.avro.AvroSchemaWithTypeVisitor.visitRecord(AvroSchemaWithTypeVisitor.java:71)
        at 
org.apache.iceberg.avro.AvroSchemaWithTypeVisitor.visit(AvroSchemaWithTypeVisitor.java:38)
        at 
org.apache.iceberg.avro.AvroSchemaWithTypeVisitor.visit(AvroSchemaWithTypeVisitor.java:32)
        at 
org.apache.iceberg.spark.data.SparkAvroReader.<init>(SparkAvroReader.java:57)
        at 
org.apache.iceberg.spark.data.SparkAvroReader.<init>(SparkAvroReader.java:50)
        at 
org.apache.iceberg.avro.Avro$ReadBuilder.lambda$build$1(Avro.java:652)
        at 
org.apache.iceberg.avro.ProjectionDatumReader.newDatumReader(ProjectionDatumReader.java:79)
        at 
org.apache.iceberg.avro.ProjectionDatumReader.setSchema(ProjectionDatumReader.java:69)
        at 
org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:133)
        at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:130)
        at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:122)
        at 
org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:66)
        at 
org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:100)
        at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:77)
        at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:37)
        at 
org.apache.iceberg.relocated.com.google.common.collect.Lists.newArrayList(Lists.java:133)
        at 
org.apache.iceberg.spark.data.TestSparkAvroUnions.writeAndValidateRequiredComplexUnion(TestSparkAvroUnions.java:75)
   ```

##########
File path: 
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroUnions.java
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.spark.data;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.avro.AvroIterable;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestSparkAvroUnions {

Review comment:
       Can we add a test case here to check for the output when only some 
fields within the union are projected?

##########
File path: core/src/main/java/org/apache/iceberg/avro/SchemaToType.java
##########
@@ -105,13 +106,27 @@ public Type record(Schema record, List<String> names, 
List<Type> fieldTypes) {
 
   @Override
   public Type union(Schema union, List<Type> options) {
-    Preconditions.checkArgument(AvroSchemaUtil.isOptionSchema(union),
-        "Unsupported type: non-option union: %s", union);
-    // records, arrays, and maps will check nullability later
-    if (options.get(0) == null) {
-      return options.get(1);
+    if (AvroSchemaUtil.isOptionSchema(union)) {
+      // Optional simple union
+      // records, arrays, and maps will check nullability later
+      if (options.get(0) == null) {
+        return options.get(1);
+      } else {
+        return options.get(0);
+      }
     } else {
-      return options.get(0);
+      // Complex union
+      List<Types.NestedField> newFields = new ArrayList<>();
+      newFields.add(Types.NestedField.required(allocateId(), "tag", 
Types.IntegerType.get()));
+
+      int tagIndex = 0;
+      for (Type type : options) {
+        if (type != null) {
+          newFields.add(Types.NestedField.optional(allocateId(), "field" + 
tagIndex++, type));
+        }

Review comment:
       Can you add the reasoning behind why the field names `tag` and 
`field${index}` were chosen when converting to struct? Spark's Avro datasources 
uses `member${index}` while Hive's `extract_union` UDF uses `tag_${index}`.  

##########
File path: core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java
##########
@@ -154,6 +154,27 @@ public static boolean isOptionSchema(Schema schema) {
     return false;
   }
 
+  /**
+   * This method decides whether a schema is of type union and is complex 
union and is optional
+   *
+   * Complex union: the number of options in union not equals to 2
+   * Optional: null is present in union
+   *
+   * @param schema input schema
+   * @return true if schema is complex union and it is optional
+   */
+  public static boolean isOptionalComplexUnion(Schema schema) {
+    if (schema.getType() == UNION && schema.getTypes().size() != 2) {

Review comment:
       What happens if it receives a schema like `["null"]`. This is regarded 
as an optional complex union by this function. Is this expected? Should we 
instead check for `schema.getTypes().size() > 2`. What is the impact of this 
return true on the readers?

##########
File path: core/src/main/java/org/apache/iceberg/avro/PruneColumns.java
##########
@@ -323,4 +325,19 @@ private static Schema makeEmptyCopy(Schema field) {
   private static boolean isOptionSchemaWithNonNullFirstOption(Schema schema) {
     return AvroSchemaUtil.isOptionSchema(schema) && 
schema.getTypes().get(0).getType() != Schema.Type.NULL;
   }
+
+  // for primitive types, the visitResult will be null, we want to reuse the 
primitive types from the original
+  // schema, while for nested types, we want to use the visitResult because 
they have content from the previous
+  // recursive calls.
+  private static Schema copyUnion(Schema record, List<Schema> visitResults) {

Review comment:
       It does not look we prune any columns in the complex union case. I tried 
modifying `TestSparkAvroUnions` to test projecting a single field from the 
complex union and it fails (error below). There are a couple of ways we can go 
about column pruning:
   1. We should implement column pruning for complex unions. It should be okay 
to implement this as a followup PR, but in that case I think we should throw a 
better error message letting the user known that projecting fields within a 
complex union is not supported for now.
   2. We can choose not to prune columns inside complex unions and rather let 
the engine handle the pruning. E.g. you can see a comment in `map()` => `// 
right now, maps can't be selected without values`. The `ReadBuilder`s might 
need to be handled to account for this change. We also need to make sure that 
the schema advertised by Iceberg to the engine does not prune these fields 
either. In Spark land, this would be the schema returned through 
`Scan.readSchema()`.
   
   ```
   index (1) must be less than size (1)
   java.lang.IndexOutOfBoundsException: index (1) must be less than size (1)
        at 
org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:1355)
        at 
org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:1337)
        at 
org.apache.iceberg.relocated.com.google.common.collect.SingletonImmutableList.get(SingletonImmutableList.java:44)
        at 
org.apache.iceberg.avro.AvroSchemaWithTypeVisitor.visitUnion(AvroSchemaWithTypeVisitor.java:98)
        at 
org.apache.iceberg.avro.AvroSchemaWithTypeVisitor.visit(AvroSchemaWithTypeVisitor.java:41)
        at 
org.apache.iceberg.avro.AvroSchemaWithTypeVisitor.visitRecord(AvroSchemaWithTypeVisitor.java:71)
        at 
org.apache.iceberg.avro.AvroSchemaWithTypeVisitor.visit(AvroSchemaWithTypeVisitor.java:38)
        at 
org.apache.iceberg.avro.AvroSchemaWithTypeVisitor.visit(AvroSchemaWithTypeVisitor.java:32)
        at 
org.apache.iceberg.spark.data.SparkAvroReader.<init>(SparkAvroReader.java:57)
        at 
org.apache.iceberg.spark.data.SparkAvroReader.<init>(SparkAvroReader.java:50)
        at 
org.apache.iceberg.avro.Avro$ReadBuilder.lambda$build$1(Avro.java:652)
        at 
org.apache.iceberg.avro.ProjectionDatumReader.newDatumReader(ProjectionDatumReader.java:79)
        at 
org.apache.iceberg.avro.ProjectionDatumReader.setSchema(ProjectionDatumReader.java:69)
        at 
org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:133)
        at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:130)
        at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:122)
        at 
org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:66)
        at 
org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:100)
        at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:77)
        at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:37)
        at 
org.apache.iceberg.relocated.com.google.common.collect.Lists.newArrayList(Lists.java:133)
        at 
org.apache.iceberg.spark.data.TestSparkAvroUnions.writeAndValidateRequiredComplexUnion(TestSparkAvroUnions.java:75)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to