rdblue commented on a change in pull request #1046:
URL: https://github.com/apache/incubator-iceberg/pull/1046#discussion_r430012232



##########
File path: api/src/main/java/org/apache/iceberg/types/Types.java
##########
@@ -526,25 +526,25 @@ public static StructType of(List<NestedField> fields) {
       return new StructType(fields, false);
     }
 
-    public static StructType of(List<NestedField> fields, boolean 
isUnionSchema) {
-      return new StructType(fields, isUnionSchema);
+    public static StructType of(List<NestedField> fields, boolean 
convertedFromUnionSchema) {
+      return new StructType(fields, convertedFromUnionSchema);

Review comment:
       I still don't think that Iceberg needs to model that this was converted. 
The conversion back to a union schema is not something that Iceberg should 
support. Iceberg must only _write_ Iceberg data files and those are not allowed 
to contain Avro unions. The use case that we aim to support is reading data 
files with existing unions, if I understand correctly.

##########
File path: core/src/main/java/org/apache/iceberg/avro/SchemaToType.java
##########
@@ -106,11 +106,27 @@ public Type record(Schema record, List<String> names, 
List<Type> fieldTypes) {
   public Type union(Schema union, List<Type> options) {
     Preconditions.checkArgument(AvroSchemaUtil.isOptionSchema(union),
         "Unsupported type: non-option union: %s", union);
-    // records, arrays, and maps will check nullability later
-    if (options.get(0) == null) {
-      return options.get(1);
-    } else {
+    if (options.size() == 1) {
       return options.get(0);
+    } else if (options.size() == 2) {
+      if (options.get(0) == null) {
+        return options.get(1);
+      } else {
+        return options.get(0);
+      }
+    } else {
+      // Convert complex unions to struct types where field names are member0, 
member1, etc.
+      // This is consistent with the behavior of the spark Avro SchemaConverter
+      List<Types.NestedField> fields = 
Lists.newArrayListWithExpectedSize(options.size());
+      for (int i = 0; i < options.size(); i += 1) {
+        Type fieldType = options.get(i);
+        if (fieldType == null) {

Review comment:
       I think I see. The reason is that all branches of the union are optional 
and there is no way to encode whether one branch (let alone only one branch) 
will be non-null?

##########
File path: 
core/src/test/java/org/apache/iceberg/avro/AvroDataUnionRecordTest.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.avro;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.FileAppender;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class AvroDataUnionRecordTest {
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  protected void writeAndValidate(
+      List<GenericData.Record> actualWrite,
+      List<GenericData.Record> expectedRead,
+      Schema icebergSchema) throws IOException {
+    File testFile = temp.newFile();
+    Assert.assertTrue("Delete should succeed", testFile.delete());
+
+    try (FileAppender<GenericData.Record> writer = 
Avro.write(Files.localOutput(testFile))
+        .schema(icebergSchema)
+        .named("test")
+        .build()) {
+      for (GenericData.Record rec : actualWrite) {
+        writer.add(rec);
+      }
+    }
+
+    List<GenericData.Record> rows;
+    try (AvroIterable<GenericData.Record> reader = 
Avro.read(Files.localInput(testFile))
+        .project(icebergSchema)
+        .build()) {
+      rows = Lists.newArrayList(reader);
+    }
+
+    for (int i = 0; i < expectedRead.size(); i += 1) {
+      AvroTestHelpers.assertEquals(icebergSchema.asStruct(), 
expectedRead.get(i), rows.get(i));
+    }
+  }
+
+  @Test
+  public void testMapOfUnionValues() throws IOException {
+    String schema1 = "{\n" +
+        "  \"name\": \"MapOfUnion\",\n" +
+        "  \"type\": \"record\",\n" +
+        "  \"fields\": [\n" +
+        "    {\n" +
+        "      \"name\": \"map\",\n" +
+        "      \"type\": [\n" +
+        "        \"null\",\n" +
+        "        {\n" +
+        "          \"type\": \"map\",\n" +
+        "          \"values\": [\n" +
+        "            \"null\",\n" +
+        "            \"boolean\",\n" +
+        "            \"int\",\n" +
+        "            \"long\",\n" +
+        "            \"float\",\n" +
+        "            \"double\",\n" +
+        "            \"bytes\",\n" +
+        "            \"string\"\n" +
+        "          ]\n" +
+        "        }\n" +
+        "      ]\n" +
+        "    }\n" +
+        "  ]\n" +
+        "}";
+    org.apache.avro.Schema avroSchema = new 
org.apache.avro.Schema.Parser().parse(schema1);
+    org.apache.iceberg.Schema icebergSchema = 
AvroSchemaUtil.toIceberg(avroSchema);
+    org.apache.avro.Schema avroSchemaUnionRecord = 
AvroSchemaUtil.convert(icebergSchema, "test");
+    org.apache.avro.Schema unionRecordSchema =
+        
avroSchemaUnionRecord.getFields().get(0).schema().getTypes().get(1).getValueType().getTypes().get(1);
+
+    List<GenericData.Record> expectedRead = new ArrayList<>();
+    List<GenericData.Record> actualWrite = new ArrayList<>();
+
+    for (long i = 0; i < 10; i++) {
+      Map<String, Object> map = new HashMap<>();
+      Map<String, Object> mapRead = new HashMap<>();
+      updateMapsForUnionSchema(unionRecordSchema, map, mapRead, i);
+      GenericData.Record recordRead = new GenericRecordBuilder(avroSchema)
+          .set("map", mapRead)
+          .build();
+      GenericData.Record record = new GenericRecordBuilder(avroSchema)
+          .set("map", map)
+          .build();
+      actualWrite.add(record);
+      expectedRead.add(recordRead);
+    }
+    writeAndValidate(actualWrite, expectedRead, icebergSchema);
+  }
+
+  private void updateMapsForUnionSchema(
+      org.apache.avro.Schema unionRecordSchema,
+      Map<String, Object> map,
+      Map<String, Object> mapRead,
+      Long index) {
+    map.put("boolean", index % 2 == 0);
+    map.put("int", index.intValue());
+    map.put("long", index);
+    map.put("float", index.floatValue());
+    map.put("double", index.doubleValue());
+    map.put("bytes", ByteBuffer.wrap(("bytes_" + index).getBytes()));
+    map.put("string", "string_" + index);
+
+    map.entrySet().stream().forEach(e -> {
+      String key = e.getKey();
+      GenericData.Record record = 
getGenericRecordForUnionType(unionRecordSchema, map, key);
+      mapRead.put(key, record);
+    });
+  }
+
+  private GenericData.Record getGenericRecordForUnionType(

Review comment:
       Data files with unions must be written by non-Iceberg producers because 
Iceberg will not write invalid data files. A union schema cannot contain 
Iceberg field IDs because there is no defined way to encode them -- union has 
no extra properties and while types could be replaced with a JSON object with 
properties, there isn't anything that pulls IDs out.
   
   For external data files, what would happen is the user would convert a 
schema and create a table from it, at which point the IDs get assigned. Next, 
the user would create a name mapping with that schema's names to Iceberg field 
IDs. That mapping is how we recover IDs from field names. That works because 
Avro is name-based so we can map its schema evolution into Iceberg.
   
   But name-based doesn't work with unions that are converted to structs 
because we are automatically assigning the names using a counter. If the 
definition of `member2` changes, we will still map it to the same Iceberg ID by 
name.

##########
File path: api/src/main/java/org/apache/iceberg/types/Types.java
##########
@@ -526,25 +526,25 @@ public static StructType of(List<NestedField> fields) {
       return new StructType(fields, false);
     }
 
-    public static StructType of(List<NestedField> fields, boolean 
isUnionSchema) {
-      return new StructType(fields, isUnionSchema);
+    public static StructType of(List<NestedField> fields, boolean 
convertedFromUnionSchema) {
+      return new StructType(fields, convertedFromUnionSchema);

Review comment:
       > there is no other way to pass this information for GenericAvroWriter
   
   I understand that there is no other way to pass the information. What I'm 
saying is that GenericAvroWriter cannot write non-option unions anyway. Iceberg 
must not write anything that is not allowed by the spec. This feature can be 
used to read existing Avro data files, but cannot be used to write Avro data 
files with unions.

##########
File path: 
core/src/test/java/org/apache/iceberg/avro/AvroDataUnionRecordTest.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.avro;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.FileAppender;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class AvroDataUnionRecordTest {
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  protected void writeAndValidate(
+      List<GenericData.Record> actualWrite,
+      List<GenericData.Record> expectedRead,
+      Schema icebergSchema) throws IOException {
+    File testFile = temp.newFile();
+    Assert.assertTrue("Delete should succeed", testFile.delete());
+
+    try (FileAppender<GenericData.Record> writer = 
Avro.write(Files.localOutput(testFile))
+        .schema(icebergSchema)
+        .named("test")
+        .build()) {
+      for (GenericData.Record rec : actualWrite) {
+        writer.add(rec);
+      }
+    }
+
+    List<GenericData.Record> rows;
+    try (AvroIterable<GenericData.Record> reader = 
Avro.read(Files.localInput(testFile))
+        .project(icebergSchema)
+        .build()) {
+      rows = Lists.newArrayList(reader);
+    }
+
+    for (int i = 0; i < expectedRead.size(); i += 1) {
+      AvroTestHelpers.assertEquals(icebergSchema.asStruct(), 
expectedRead.get(i), rows.get(i));
+    }
+  }
+
+  @Test
+  public void testMapOfUnionValues() throws IOException {
+    String schema1 = "{\n" +
+        "  \"name\": \"MapOfUnion\",\n" +
+        "  \"type\": \"record\",\n" +
+        "  \"fields\": [\n" +
+        "    {\n" +
+        "      \"name\": \"map\",\n" +
+        "      \"type\": [\n" +
+        "        \"null\",\n" +
+        "        {\n" +
+        "          \"type\": \"map\",\n" +
+        "          \"values\": [\n" +
+        "            \"null\",\n" +
+        "            \"boolean\",\n" +
+        "            \"int\",\n" +
+        "            \"long\",\n" +
+        "            \"float\",\n" +
+        "            \"double\",\n" +
+        "            \"bytes\",\n" +
+        "            \"string\"\n" +
+        "          ]\n" +
+        "        }\n" +
+        "      ]\n" +
+        "    }\n" +
+        "  ]\n" +
+        "}";
+    org.apache.avro.Schema avroSchema = new 
org.apache.avro.Schema.Parser().parse(schema1);
+    org.apache.iceberg.Schema icebergSchema = 
AvroSchemaUtil.toIceberg(avroSchema);
+    org.apache.avro.Schema avroSchemaUnionRecord = 
AvroSchemaUtil.convert(icebergSchema, "test");
+    org.apache.avro.Schema unionRecordSchema =
+        
avroSchemaUnionRecord.getFields().get(0).schema().getTypes().get(1).getValueType().getTypes().get(1);
+
+    List<GenericData.Record> expectedRead = new ArrayList<>();
+    List<GenericData.Record> actualWrite = new ArrayList<>();
+
+    for (long i = 0; i < 10; i++) {
+      Map<String, Object> map = new HashMap<>();
+      Map<String, Object> mapRead = new HashMap<>();
+      updateMapsForUnionSchema(unionRecordSchema, map, mapRead, i);
+      GenericData.Record recordRead = new GenericRecordBuilder(avroSchema)
+          .set("map", mapRead)
+          .build();
+      GenericData.Record record = new GenericRecordBuilder(avroSchema)
+          .set("map", map)
+          .build();
+      actualWrite.add(record);
+      expectedRead.add(recordRead);
+    }
+    writeAndValidate(actualWrite, expectedRead, icebergSchema);
+  }
+
+  private void updateMapsForUnionSchema(
+      org.apache.avro.Schema unionRecordSchema,
+      Map<String, Object> map,
+      Map<String, Object> mapRead,
+      Long index) {
+    map.put("boolean", index % 2 == 0);
+    map.put("int", index.intValue());
+    map.put("long", index);
+    map.put("float", index.floatValue());
+    map.put("double", index.doubleValue());
+    map.put("bytes", ByteBuffer.wrap(("bytes_" + index).getBytes()));
+    map.put("string", "string_" + index);
+
+    map.entrySet().stream().forEach(e -> {
+      String key = e.getKey();
+      GenericData.Record record = 
getGenericRecordForUnionType(unionRecordSchema, map, key);
+      mapRead.put(key, record);
+    });
+  }
+
+  private GenericData.Record getGenericRecordForUnionType(

Review comment:
       What do you mean the input stream has a union schema? The records that 
you're trying to write?
   
   Incoming records are required to match the write schema. Because the write 
schema cannot contain a union, the records also cannot contain a union. The 
union must be represented in memory as a struct of the union types.

##########
File path: 
core/src/test/java/org/apache/iceberg/avro/AvroDataUnionRecordTest.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.avro;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.FileAppender;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class AvroDataUnionRecordTest {
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  protected void writeAndValidate(
+      List<GenericData.Record> actualWrite,
+      List<GenericData.Record> expectedRead,
+      Schema icebergSchema) throws IOException {
+    File testFile = temp.newFile();
+    Assert.assertTrue("Delete should succeed", testFile.delete());
+
+    try (FileAppender<GenericData.Record> writer = 
Avro.write(Files.localOutput(testFile))
+        .schema(icebergSchema)
+        .named("test")
+        .build()) {
+      for (GenericData.Record rec : actualWrite) {
+        writer.add(rec);
+      }
+    }
+
+    List<GenericData.Record> rows;
+    try (AvroIterable<GenericData.Record> reader = 
Avro.read(Files.localInput(testFile))
+        .project(icebergSchema)
+        .build()) {
+      rows = Lists.newArrayList(reader);
+    }
+
+    for (int i = 0; i < expectedRead.size(); i += 1) {
+      AvroTestHelpers.assertEquals(icebergSchema.asStruct(), 
expectedRead.get(i), rows.get(i));
+    }
+  }
+
+  @Test
+  public void testMapOfUnionValues() throws IOException {
+    String schema1 = "{\n" +
+        "  \"name\": \"MapOfUnion\",\n" +
+        "  \"type\": \"record\",\n" +
+        "  \"fields\": [\n" +
+        "    {\n" +
+        "      \"name\": \"map\",\n" +
+        "      \"type\": [\n" +
+        "        \"null\",\n" +
+        "        {\n" +
+        "          \"type\": \"map\",\n" +
+        "          \"values\": [\n" +
+        "            \"null\",\n" +
+        "            \"boolean\",\n" +
+        "            \"int\",\n" +
+        "            \"long\",\n" +
+        "            \"float\",\n" +
+        "            \"double\",\n" +
+        "            \"bytes\",\n" +
+        "            \"string\"\n" +
+        "          ]\n" +
+        "        }\n" +
+        "      ]\n" +
+        "    }\n" +
+        "  ]\n" +
+        "}";
+    org.apache.avro.Schema avroSchema = new 
org.apache.avro.Schema.Parser().parse(schema1);
+    org.apache.iceberg.Schema icebergSchema = 
AvroSchemaUtil.toIceberg(avroSchema);
+    org.apache.avro.Schema avroSchemaUnionRecord = 
AvroSchemaUtil.convert(icebergSchema, "test");
+    org.apache.avro.Schema unionRecordSchema =
+        
avroSchemaUnionRecord.getFields().get(0).schema().getTypes().get(1).getValueType().getTypes().get(1);
+
+    List<GenericData.Record> expectedRead = new ArrayList<>();
+    List<GenericData.Record> actualWrite = new ArrayList<>();
+
+    for (long i = 0; i < 10; i++) {
+      Map<String, Object> map = new HashMap<>();
+      Map<String, Object> mapRead = new HashMap<>();
+      updateMapsForUnionSchema(unionRecordSchema, map, mapRead, i);
+      GenericData.Record recordRead = new GenericRecordBuilder(avroSchema)
+          .set("map", mapRead)
+          .build();
+      GenericData.Record record = new GenericRecordBuilder(avroSchema)
+          .set("map", map)
+          .build();
+      actualWrite.add(record);
+      expectedRead.add(recordRead);
+    }
+    writeAndValidate(actualWrite, expectedRead, icebergSchema);
+  }
+
+  private void updateMapsForUnionSchema(
+      org.apache.avro.Schema unionRecordSchema,
+      Map<String, Object> map,
+      Map<String, Object> mapRead,
+      Long index) {
+    map.put("boolean", index % 2 == 0);
+    map.put("int", index.intValue());
+    map.put("long", index);
+    map.put("float", index.floatValue());
+    map.put("double", index.doubleValue());
+    map.put("bytes", ByteBuffer.wrap(("bytes_" + index).getBytes()));
+    map.put("string", "string_" + index);
+
+    map.entrySet().stream().forEach(e -> {
+      String key = e.getKey();
+      GenericData.Record record = 
getGenericRecordForUnionType(unionRecordSchema, map, key);
+      mapRead.put(key, record);
+    });
+  }
+
+  private GenericData.Record getGenericRecordForUnionType(

Review comment:
       If you want to deserialize directly to that representation, you can use 
the same readers to read an single encoded message. Here's the Iceberg generics 
decoder, which is similar but uses Iceberg generics: 
https://github.com/apache/incubator-iceberg/blob/master/data/src/main/java/org/apache/iceberg/data/avro/IcebergDecoder.java

##########
File path: api/src/main/java/org/apache/iceberg/types/Types.java
##########
@@ -526,25 +526,25 @@ public static StructType of(List<NestedField> fields) {
       return new StructType(fields, false);
     }
 
-    public static StructType of(List<NestedField> fields, boolean 
isUnionSchema) {
-      return new StructType(fields, isUnionSchema);
+    public static StructType of(List<NestedField> fields, boolean 
convertedFromUnionSchema) {
+      return new StructType(fields, convertedFromUnionSchema);

Review comment:
       Reading data written with a union schema is okay, but Iceberg cannot 
write data files with a union schema (other than options).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to