sv2000 commented on a change in pull request #3090:
URL: https://github.com/apache/incubator-gobblin/pull/3090#discussion_r481366460



##########
File path: 
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.writer;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericEnumSymbol;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.TypeDescription;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+
+/**
+ * A direct copy of the class with the same name in Dali2 project, other than 
added
+ * UnionConverter implementation to support raw-ingestion for union usecase.
+ *
+ * Note that the implementation in Dali2 is likely to be changed to stay 
consistent with Iceberg's implementation for

Review comment:
       Same as above. Remove reference to Dali.

##########
File path: 
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/AvroOrcSchemaConverter.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.writer;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.avro.Schema;
+import org.apache.orc.TypeDescription;
+
+
+/**
+ * A utility class that provides a method to convert {@link Schema} into 
{@link TypeDescription}.
+ */
+public class AvroOrcSchemaConverter {

Review comment:
       Worth considering moving this class to gobblin-utility module. Not 
perfect - but may be inside AvroUtils? Alternately, OrcUtils could be moved to 
gobblin-utility and this class could go inside that class.

##########
File path: 
gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.orc.OrcFile;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcUnion;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.io.Files;
+
+import lombok.extern.slf4j.Slf4j;
+
+import static org.apache.orc.mapred.OrcMapredRecordReader.nextValue;
+
+
+@Slf4j
+public class GenericRecordToOrcValueWriterTest {
+  @Test
+  public void testUnionRecordConversionWriter()
+      throws Exception {
+    Schema schema =
+        new 
Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("union_test/schema.avsc"));
+
+    TypeDescription orcSchema = AvroOrcSchemaConverter.getOrcSchema(schema);
+    GenericRecordToOrcValueWriter valueWriter = new 
GenericRecordToOrcValueWriter(orcSchema, schema);
+    VectorizedRowBatch rowBatch = orcSchema.createRowBatch();
+
+    List<GenericRecord> recordList = deserializeAvroRecords(this.getClass(), 
schema, "union_test/data.json");
+    for (GenericRecord record : recordList) {
+      valueWriter.write(record, rowBatch);
+    }
+
+    // Flush RowBatch into disk.
+    File tempFile = new File(Files.createTempDir(), "orc");
+    tempFile.deleteOnExit();
+    Path filePath = new Path(tempFile.getAbsolutePath());
+
+    OrcFile.WriterOptions options = OrcFile.writerOptions(new Properties(), 
new Configuration());
+    options.setSchema(orcSchema);
+    Writer orcFileWriter = OrcFile.createWriter(filePath, options);
+    orcFileWriter.addRowBatch(rowBatch);
+    orcFileWriter.close();
+
+    // Load it back and compare.
+    FileSystem fs = FileSystem.get(new Configuration());
+    List<Writable> orcRecords = deserializeOrcRecords(filePath, fs);
+
+    Assert.assertEquals(orcRecords.size(), 5);
+
+    // Knowing all of them are OrcStruct<OrcUnion>, save the effort to 
recursively convert GenericRecord to OrcStruct
+    // for comprehensive comparison which is non-trivial,
+    // although it is also theoretically possible and optimal way for doing 
this unit test.
+    List<OrcUnion> unionList = 
orcRecords.stream().map(this::getUnionFieldFromStruct).collect(Collectors.toList());
+
+    // Constructing all OrcUnion and verify all of them appears in unionList.
+    TypeDescription unionSchema = orcSchema.getChildren().get(0);
+    OrcUnion union_0 = new OrcUnion(unionSchema);
+    union_0.set((byte) 0, new Text("urn:li:member:3"));
+    Assert.assertTrue(unionList.contains(union_0));
+
+    OrcUnion union_1 = new OrcUnion(unionSchema);
+    union_1.set((byte) 0, new Text("urn:li:member:4"));
+    Assert.assertTrue(unionList.contains(union_1));
+
+    OrcUnion union_2 = new OrcUnion(unionSchema);
+    union_2.set((byte) 1, new IntWritable(2));
+    Assert.assertTrue(unionList.contains(union_2));
+
+    OrcUnion union_3 = new OrcUnion(unionSchema);
+    union_3.set((byte) 1, new IntWritable(1));
+    Assert.assertTrue(unionList.contains(union_3));
+
+    OrcUnion union_4 = new OrcUnion(unionSchema);
+    union_4.set((byte) 1, new IntWritable(3));
+    Assert.assertTrue(unionList.contains(union_4));
+  }
+
+  @Test
+  public void testListResize()
+      throws Exception {
+    Schema schema =
+        new 
Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("list_map_test/schema.avsc"));
+
+    TypeDescription orcSchema = AvroOrcSchemaConverter.getOrcSchema(schema);
+    GenericRecordToOrcValueWriter valueWriter = new 
GenericRecordToOrcValueWriter(orcSchema, schema);
+    // Make the batch size very small so that the enlarge behavior would 
easily be triggered.
+    // But this has to more than the number of records that we deserialized 
form data.json, as here we don't reset batch.
+    VectorizedRowBatch rowBatch = orcSchema.createRowBatch(10);
+
+    List<GenericRecord> recordList = deserializeAvroRecords(this.getClass(), 
schema, "list_map_test/data.json");
+    Assert.assertEquals(recordList.size(), 6);
+    for (GenericRecord record : recordList) {
+      valueWriter.write(record, rowBatch);
+    }
+    // Examining resize count, which should happen only once for map and list, 
so totally 2.
+    Assert.assertEquals(valueWriter.resizeCount, 2);
+  }
+
+  /**
+   * Accessing "fields" using reflection to work-around access modifiers.
+   */
+  private OrcUnion getUnionFieldFromStruct(Writable struct) {
+    try {
+      OrcStruct orcStruct = (OrcStruct) struct;
+      Field objectArr = OrcStruct.class.getDeclaredField("fields");
+      objectArr.setAccessible(true);
+      return (OrcUnion) ((Object[]) objectArr.get(orcStruct))[0];
+    } catch (Exception e) {
+      throw new RuntimeException("Cannot access with reflection", e);
+    }
+  }
+
+  public static final List<GenericRecord> deserializeAvroRecords(Class clazz, 
Schema schema, String schemaPath)

Review comment:
       Looks like this method can be moved into a test utils class.

##########
File path: 
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterBuilder.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+
+public class GobblinOrcWriterBuilder extends FsDataWriterBuilder<Schema, 
GenericRecord> {
+  public GobblinOrcWriterBuilder() {
+  }
+
+  @Override
+  public DataWriter<GenericRecord> build()
+      throws IOException {
+    Preconditions.checkNotNull(this.destination);
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(this.writerId));
+    Preconditions.checkNotNull(this.schema);
+
+    switch (this.destination.getType()) {
+      case HDFS:

Review comment:
       Just an FYI - there have been questions on Gitter whether Gobblin 
supports writing in ORC format to S3. What aspects of GobblinOrcWriter can be 
reused for future writes to S3? 

##########
File path: 
gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/AvroOrcSchemaConverterTest.java
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.orc.TypeDescription;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Preconditions;
+
+import static 
org.apache.gobblin.writer.AvroOrcSchemaConverter.sanitizeNullableSchema;
+
+
+public class AvroOrcSchemaConverterTest {
+  @Test
+  public void testUnionORCSchemaTranslation() throws Exception {
+    Schema avroUnion = SchemaBuilder.record("test")
+        .fields()
+        .name("test_union")
+        
.type(SchemaBuilder.builder().unionOf().stringType().and().intType().and().nullType().endUnion())
+        .noDefault()
+        .endRecord();
+
+    TypeDescription unionSchema = TypeDescription.createUnion()
+        .addUnionChild(TypeDescription.createString())
+        .addUnionChild(TypeDescription.createInt());
+    TypeDescription recordSchemaWithUnion = 
TypeDescription.createStruct().addField("test_union", unionSchema);
+
+    // Verify the schema conversion for Union works
+    Assert.assertEquals(AvroOrcSchemaConverter.getOrcSchema(avroUnion), 
recordSchemaWithUnion);
+
+    //Create a nullable union field
+    Schema nullableAvroUnion = SchemaBuilder.record("test")
+        .fields()
+        .name("test_union")
+        
.type(SchemaBuilder.builder().unionOf().stringType().and().nullType().endUnion())
+        .noDefault()
+        .endRecord();
+    //Assert that Orc schema has flattened the nullable union to the member's 
type
+    Assert.assertEquals(AvroOrcSchemaConverter.getOrcSchema(nullableAvroUnion),
+        TypeDescription.createStruct().addField("test_union", 
TypeDescription.createString()));
+
+    //Create a non nullable union type
+    Schema nonNullableAvroUnion = SchemaBuilder.record("test")
+        .fields()
+        .name("test_union")
+        .type(SchemaBuilder.builder().unionOf().stringType().endUnion())
+        .noDefault()
+        .endRecord();
+    //Ensure that the union type is preserved
+    
Assert.assertEquals(AvroOrcSchemaConverter.getOrcSchema(nonNullableAvroUnion), 
TypeDescription.createStruct()
+        .addField("test_union", 
TypeDescription.createUnion().addUnionChild(TypeDescription.createString())));
+  }
+
+  @Test
+  public void testTrivialAvroSchemaTranslation() throws Exception {
+
+    // Trivial cases
+    Schema avroSchema = SchemaBuilder.record("test")
+        .fields()
+        .name("string_type")
+        .type(SchemaBuilder.builder().stringType())
+        .noDefault()
+        .name("int_type")
+        .type(SchemaBuilder.builder().intType())
+        .noDefault()
+        .endRecord();
+
+    TypeDescription orcSchema = TypeDescription.createStruct()
+        .addField("string_type", TypeDescription.createString())
+        .addField("int_type", TypeDescription.createInt());
+
+    // Top-level record name will not be replicated in conversion result.
+    Assert.assertEquals(avroSchema.getFields(), 
getAvroSchema(orcSchema).getFields());
+  }
+
+  @Test
+  public void testUnionAvroSchemaTranslation() throws Exception {
+    Schema avroSchema = SchemaBuilder.record("test")
+        .fields()
+        .name("union_nested")
+        
.type(SchemaBuilder.builder().unionOf().stringType().and().intType().endUnion())
+        .noDefault()
+        .endRecord();
+    TypeDescription orcSchema = TypeDescription.createStruct()
+        .addField("union_nested", TypeDescription.createUnion()
+            .addUnionChild(TypeDescription.createString())
+            .addUnionChild(TypeDescription.createInt()));
+
+    Assert.assertEquals(avroSchema.getFields(), 
getAvroSchema(orcSchema).getFields());
+  }
+
+  @Test
+  public void testSchemaSanitization() throws Exception {
+
+    // Two field along with null
+    Schema avroSchema = 
SchemaBuilder.builder().unionOf().nullType().and().stringType().and().intType().endUnion();
+    Schema expectedSchema = 
SchemaBuilder.builder().unionOf().stringType().and().intType().endUnion();
+    Assert.assertEquals(sanitizeNullableSchema(avroSchema), expectedSchema);
+
+    // Only one field except null
+    Schema avroSchema_1 = SchemaBuilder.builder()
+        .unionOf()
+        .nullType()
+        .and()
+        .record("test")
+        .fields()
+        .name("aaa")
+        .type(SchemaBuilder.builder().intType())
+        .noDefault()
+        .endRecord()
+        .endUnion();
+    expectedSchema = SchemaBuilder.builder()
+        .record("test")
+        .fields()
+        .name("aaa")
+        .type(SchemaBuilder.builder().intType())
+        .noDefault()
+        .endRecord();
+    Assert.assertEquals(sanitizeNullableSchema(avroSchema_1), expectedSchema);
+  }
+
+  public static Schema getAvroSchema(TypeDescription schema) {

Review comment:
       A candidate to move into a test utils class?

##########
File path: 
gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.orc.OrcFile;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcUnion;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.io.Files;
+
+import lombok.extern.slf4j.Slf4j;
+
+import static org.apache.orc.mapred.OrcMapredRecordReader.nextValue;
+
+
+@Slf4j
+public class GenericRecordToOrcValueWriterTest {
+  @Test
+  public void testUnionRecordConversionWriter()
+      throws Exception {
+    Schema schema =
+        new 
Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("union_test/schema.avsc"));
+
+    TypeDescription orcSchema = AvroOrcSchemaConverter.getOrcSchema(schema);
+    GenericRecordToOrcValueWriter valueWriter = new 
GenericRecordToOrcValueWriter(orcSchema, schema);
+    VectorizedRowBatch rowBatch = orcSchema.createRowBatch();
+
+    List<GenericRecord> recordList = deserializeAvroRecords(this.getClass(), 
schema, "union_test/data.json");
+    for (GenericRecord record : recordList) {
+      valueWriter.write(record, rowBatch);
+    }
+
+    // Flush RowBatch into disk.
+    File tempFile = new File(Files.createTempDir(), "orc");
+    tempFile.deleteOnExit();
+    Path filePath = new Path(tempFile.getAbsolutePath());
+
+    OrcFile.WriterOptions options = OrcFile.writerOptions(new Properties(), 
new Configuration());
+    options.setSchema(orcSchema);
+    Writer orcFileWriter = OrcFile.createWriter(filePath, options);
+    orcFileWriter.addRowBatch(rowBatch);
+    orcFileWriter.close();
+
+    // Load it back and compare.
+    FileSystem fs = FileSystem.get(new Configuration());
+    List<Writable> orcRecords = deserializeOrcRecords(filePath, fs);
+
+    Assert.assertEquals(orcRecords.size(), 5);
+
+    // Knowing all of them are OrcStruct<OrcUnion>, save the effort to 
recursively convert GenericRecord to OrcStruct
+    // for comprehensive comparison which is non-trivial,
+    // although it is also theoretically possible and optimal way for doing 
this unit test.
+    List<OrcUnion> unionList = 
orcRecords.stream().map(this::getUnionFieldFromStruct).collect(Collectors.toList());
+
+    // Constructing all OrcUnion and verify all of them appears in unionList.
+    TypeDescription unionSchema = orcSchema.getChildren().get(0);
+    OrcUnion union_0 = new OrcUnion(unionSchema);
+    union_0.set((byte) 0, new Text("urn:li:member:3"));
+    Assert.assertTrue(unionList.contains(union_0));
+
+    OrcUnion union_1 = new OrcUnion(unionSchema);
+    union_1.set((byte) 0, new Text("urn:li:member:4"));
+    Assert.assertTrue(unionList.contains(union_1));
+
+    OrcUnion union_2 = new OrcUnion(unionSchema);
+    union_2.set((byte) 1, new IntWritable(2));
+    Assert.assertTrue(unionList.contains(union_2));
+
+    OrcUnion union_3 = new OrcUnion(unionSchema);
+    union_3.set((byte) 1, new IntWritable(1));
+    Assert.assertTrue(unionList.contains(union_3));
+
+    OrcUnion union_4 = new OrcUnion(unionSchema);
+    union_4.set((byte) 1, new IntWritable(3));
+    Assert.assertTrue(unionList.contains(union_4));
+  }
+
+  @Test
+  public void testListResize()
+      throws Exception {
+    Schema schema =
+        new 
Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("list_map_test/schema.avsc"));
+
+    TypeDescription orcSchema = AvroOrcSchemaConverter.getOrcSchema(schema);
+    GenericRecordToOrcValueWriter valueWriter = new 
GenericRecordToOrcValueWriter(orcSchema, schema);
+    // Make the batch size very small so that the enlarge behavior would 
easily be triggered.
+    // But this has to more than the number of records that we deserialized 
form data.json, as here we don't reset batch.
+    VectorizedRowBatch rowBatch = orcSchema.createRowBatch(10);
+
+    List<GenericRecord> recordList = deserializeAvroRecords(this.getClass(), 
schema, "list_map_test/data.json");
+    Assert.assertEquals(recordList.size(), 6);
+    for (GenericRecord record : recordList) {
+      valueWriter.write(record, rowBatch);
+    }
+    // Examining resize count, which should happen only once for map and list, 
so totally 2.
+    Assert.assertEquals(valueWriter.resizeCount, 2);
+  }
+
+  /**
+   * Accessing "fields" using reflection to work-around access modifiers.
+   */
+  private OrcUnion getUnionFieldFromStruct(Writable struct) {
+    try {
+      OrcStruct orcStruct = (OrcStruct) struct;
+      Field objectArr = OrcStruct.class.getDeclaredField("fields");
+      objectArr.setAccessible(true);
+      return (OrcUnion) ((Object[]) objectArr.get(orcStruct))[0];
+    } catch (Exception e) {
+      throw new RuntimeException("Cannot access with reflection", e);
+    }
+  }
+
+  public static final List<GenericRecord> deserializeAvroRecords(Class clazz, 
Schema schema, String schemaPath)
+      throws IOException {
+    List<GenericRecord> records = new ArrayList<>();
+
+    GenericDatumReader<GenericRecord> reader = new 
GenericDatumReader<>(schema);
+
+    InputStream dataInputStream = 
clazz.getClassLoader().getResourceAsStream(schemaPath);
+    Decoder decoder = DecoderFactory.get().jsonDecoder(schema, 
dataInputStream);
+    GenericRecord recordContainer = reader.read(null, decoder);
+    ;
+    try {
+      while (recordContainer != null) {
+        records.add(recordContainer);
+        recordContainer = reader.read(null, decoder);
+      }
+    } catch (IOException ioe) {
+      dataInputStream.close();
+    }
+    return records;
+  }
+
+  public static final List<Writable> deserializeOrcRecords(Path orcFilePath, 
FileSystem fs)

Review comment:
       Same comment as above.

##########
File path: 
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.writer;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericEnumSymbol;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.TypeDescription;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+
+/**
+ * A direct copy of the class with the same name in Dali2 project, other than 
added

Review comment:
       Update Javadoc to remove references to Dali/Dali2.

##########
File path: 
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.writer;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericEnumSymbol;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.TypeDescription;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+
+/**
+ * A direct copy of the class with the same name in Dali2 project, other than 
added
+ * UnionConverter implementation to support raw-ingestion for union usecase.
+ *
+ * Note that the implementation in Dali2 is likely to be changed to stay 
consistent with Iceberg's implementation for
+ * other converter implementation. That change is unnecessary for Gobblin and 
justify this separated copy.
+ */
+@Slf4j
+public class GenericRecordToOrcValueWriter implements 
OrcValueWriter<GenericRecord> {
+  private static final String ENABLE_SMART_ARRAY_ENLARGE = 
GobblinOrcWriter.ORC_WRITER_PREFIX + "enabledMulValueColumnVectorSmartSizing";
+  private static final boolean DEFAULT_ENABLE_SMART_ARRAY_ENLARGE = false;
+  private static final String ENLARGE_FACTOR_KEY = 
GobblinOrcWriter.ORC_WRITER_PREFIX + "enlargeFactor";
+  private static final int DEFAULT_ENLARGE_FACTOR = 3;
+
+  private boolean enabledSmartSizing;
+  private int enlargeFactor;
+
+  // A rough measure on how much resize is triggered, helping on debugging and 
testing.

Review comment:
       "on how much resize is triggered" -> "of how many times resize is 
triggered".

##########
File path: 
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.state.ConstructState;
+
+
+/**
+ * A wrapper for ORC-core writer without dependency on Hive SerDe library.
+ */
+@Slf4j
+public class GobblinOrcWriter extends FsDataWriter<GenericRecord> {
+  static final String ORC_WRITER_PREFIX = "orcWriter.";
+  private static final String ORC_WRITER_BATCH_SIZE = ORC_WRITER_PREFIX + 
"batchSize";
+  private static final int DEFAULT_ORC_WRITER_BATCH_SIZE = 1000;
+
+  /**
+   * Check comment of {@link #deepCleanRowBatch} for the usage of this 
configuration.
+   */
+  private static final String ORC_WRITER_DEEP_CLEAN_EVERY_BATCH = 
ORC_WRITER_PREFIX + "deepCleanBatch";
+
+  private final GenericRecordToOrcValueWriter valueWriter;
+  @VisibleForTesting
+  final VectorizedRowBatch rowBatch;
+  private final Writer orcFileWriter;
+
+  // the close method may be invoked multiple times, but the underlying writer 
only supports close being called once
+  private volatile boolean closed = false;
+  private final boolean deepCleanBatch;
+
+  private final int batchSize;
+  private final Schema avroSchema;
+
+  public GobblinOrcWriter(FsDataWriterBuilder<Schema, GenericRecord> builder, 
State properties)
+      throws IOException {
+    super(builder, properties);
+
+    log.info("Start to construct a ORC-Native Writer");
+
+    // Create value-writer which is essentially a record-by-record-converter 
with buffering in batch.
+    this.avroSchema = builder.getSchema();
+    TypeDescription typeDescription = 
AvroOrcSchemaConverter.getOrcSchema(this.avroSchema);
+    this.valueWriter = new GenericRecordToOrcValueWriter(typeDescription, 
this.avroSchema, properties);
+    this.batchSize = properties.getPropAsInt(ORC_WRITER_BATCH_SIZE, 
DEFAULT_ORC_WRITER_BATCH_SIZE);
+    this.rowBatch = typeDescription.createRowBatch(this.batchSize);
+    this.deepCleanBatch = 
properties.getPropAsBoolean(ORC_WRITER_DEEP_CLEAN_EVERY_BATCH, false);
+

Review comment:
       Can you log the ORC writer configuration? Useful for debugging, 
particularly, when topic-specific configurations are used.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to