shardulm94 commented on a change in pull request #4242:
URL: https://github.com/apache/iceberg/pull/4242#discussion_r827529615
##########
File path: core/src/main/java/org/apache/iceberg/avro/AvroSchemaVisitor.java
##########
@@ -52,8 +52,21 @@
case UNION:
List<Schema> types = schema.getTypes();
List<T> options = Lists.newArrayListWithExpectedSize(types.size());
- for (Schema type : types) {
- options.add(visit(type, visitor));
+ if (AvroSchemaUtil.isOptionSchema(schema)) {
+ for (Schema type : types) {
+ options.add(visit(type, visitor));
+ }
+ } else {
+ // complex union case
Review comment:
Seems like we are implementing business logic in the visitor. Ideally
the visitor should just handle traversing the Schema tree and leave business
logic to the implementation. e.g. here converting union to struct and assigning
field names like "field${i}" should be handled in the implementation class's
`union` method.
On a side note, I removed this piece of code and `TestUnionSchemaConversion`
ran just fine. Is this code really needed?
##########
File path: core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java
##########
@@ -154,6 +154,27 @@ public static boolean isOptionSchema(Schema schema) {
return false;
}
+ /**
+ * This method decides whether a schema is of type union and is complex
union and is optional
+ *
+ * Complex union: the number of options in union not equals to 2
+ * Optional: null is present in union
+ *
+ * @param schema input schema
+ * @return true if schema is complex union and it is optional
+ */
+ public static boolean isOptionalComplexUnion(Schema schema) {
+ if (schema.getType() == UNION && schema.getTypes().size() != 2) {
Review comment:
What happens is it receives a schema like ["null"]. This is regarded as
an optional complex union by this function. Is this expected? Should we instead
check for `schema.getTypes().size() > 2`. What is the impact of this return
true on the readers?
##########
File path:
core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java
##########
@@ -79,11 +79,25 @@
private static <T> T visitUnion(Type type, Schema union,
AvroSchemaWithTypeVisitor<T> visitor) {
List<Schema> types = union.getTypes();
List<T> options = Lists.newArrayListWithExpectedSize(types.size());
- for (Schema branch : types) {
- if (branch.getType() == Schema.Type.NULL) {
- options.add(visit((Type) null, branch, visitor));
- } else {
- options.add(visit(type, branch, visitor));
+
+ // simple union case
+ if (AvroSchemaUtil.isOptionSchema(union)) {
+ for (Schema branch : types) {
+ if (branch.getType() == Schema.Type.NULL) {
+ options.add(visit((Type) null, branch, visitor));
+ } else {
+ options.add(visit(type, branch, visitor));
+ }
+ }
+ } else { // complex union case
+ int index = 1;
+ for (Schema branch : types) {
+ if (branch.getType() == Schema.Type.NULL) {
+ options.add(visit((Type) null, branch, visitor));
+ } else {
+ options.add(visit(type.asStructType().fields().get(index).type(),
branch, visitor));
Review comment:
Similar comment to `AvroSchemaVisitor` with business logic being
introduced in the visitor.
##########
File path: core/src/main/java/org/apache/iceberg/avro/PruneColumns.java
##########
@@ -323,4 +325,19 @@ private static Schema makeEmptyCopy(Schema field) {
private static boolean isOptionSchemaWithNonNullFirstOption(Schema schema) {
return AvroSchemaUtil.isOptionSchema(schema) &&
schema.getTypes().get(0).getType() != Schema.Type.NULL;
}
+
+ // for primitive types, the visitResult will be null, we want to reuse the
primitive types from the original
+ // schema, while for nested types, we want to use the visitResult because
they have content from the previous
+ // recursive calls.
+ private static Schema copyUnion(Schema record, List<Schema> visitResults) {
Review comment:
It does not look we prune any columns in the complex union case. I tried
modifying `TestSparkAvroUnions` to test projecting a single field from the
complex union and it fails (error below). There are a couple of ways we can go
about column pruning:
1. We should implement column pruning for complex unions. It should be okay
to implement this as a followup PR, but in that case I think we should throw a
better error message letting the user known that projecting fields within a
complex union is not supported for now.
2. We can choose not to prune columns inside complex unions and rather let
the engine handle the pruning. E.g. you can see a comment in `map()` => `//
right now, maps can't be selected without values`. The `ReadBuilder`s might
need to be handled to account for this change.
```
index (1) must be less than size (1)
java.lang.IndexOutOfBoundsException: index (1) must be less than size (1)
at
org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:1355)
at
org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:1337)
at
org.apache.iceberg.relocated.com.google.common.collect.SingletonImmutableList.get(SingletonImmutableList.java:44)
at
org.apache.iceberg.avro.AvroSchemaWithTypeVisitor.visitUnion(AvroSchemaWithTypeVisitor.java:98)
at
org.apache.iceberg.avro.AvroSchemaWithTypeVisitor.visit(AvroSchemaWithTypeVisitor.java:41)
at
org.apache.iceberg.avro.AvroSchemaWithTypeVisitor.visitRecord(AvroSchemaWithTypeVisitor.java:71)
at
org.apache.iceberg.avro.AvroSchemaWithTypeVisitor.visit(AvroSchemaWithTypeVisitor.java:38)
at
org.apache.iceberg.avro.AvroSchemaWithTypeVisitor.visit(AvroSchemaWithTypeVisitor.java:32)
at
org.apache.iceberg.spark.data.SparkAvroReader.<init>(SparkAvroReader.java:57)
at
org.apache.iceberg.spark.data.SparkAvroReader.<init>(SparkAvroReader.java:50)
at
org.apache.iceberg.avro.Avro$ReadBuilder.lambda$build$1(Avro.java:652)
at
org.apache.iceberg.avro.ProjectionDatumReader.newDatumReader(ProjectionDatumReader.java:79)
at
org.apache.iceberg.avro.ProjectionDatumReader.setSchema(ProjectionDatumReader.java:69)
at
org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:133)
at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:130)
at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:122)
at
org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:66)
at
org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:100)
at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:77)
at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:37)
at
org.apache.iceberg.relocated.com.google.common.collect.Lists.newArrayList(Lists.java:133)
at
org.apache.iceberg.spark.data.TestSparkAvroUnions.writeAndValidateRequiredComplexUnion(TestSparkAvroUnions.java:75)
```
##########
File path:
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroUnions.java
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.spark.data;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.avro.AvroIterable;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestSparkAvroUnions {
Review comment:
Can we add a test case here to check for the output when only some
fields within the union are projected?
##########
File path: core/src/main/java/org/apache/iceberg/avro/SchemaToType.java
##########
@@ -105,13 +106,27 @@ public Type record(Schema record, List<String> names,
List<Type> fieldTypes) {
@Override
public Type union(Schema union, List<Type> options) {
- Preconditions.checkArgument(AvroSchemaUtil.isOptionSchema(union),
- "Unsupported type: non-option union: %s", union);
- // records, arrays, and maps will check nullability later
- if (options.get(0) == null) {
- return options.get(1);
+ if (AvroSchemaUtil.isOptionSchema(union)) {
+ // Optional simple union
+ // records, arrays, and maps will check nullability later
+ if (options.get(0) == null) {
+ return options.get(1);
+ } else {
+ return options.get(0);
+ }
} else {
- return options.get(0);
+ // Complex union
+ List<Types.NestedField> newFields = new ArrayList<>();
+ newFields.add(Types.NestedField.required(allocateId(), "tag",
Types.IntegerType.get()));
+
+ int tagIndex = 0;
+ for (Type type : options) {
+ if (type != null) {
+ newFields.add(Types.NestedField.optional(allocateId(), "field" +
tagIndex++, type));
+ }
Review comment:
Can you add the reasoning behind why the field names `tag` and
`field${index}` were chosen when converting to struct? Spark's Avro datasources
uses `member${index}` while Hive's `extract_union` UDF uses `tag_${index}`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]