This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 956d384 [GOBBLIN-1250] Open Sourcing ORC writer
956d384 is described below
commit 956d384e192f30c352f3f9d69b9411e9cb400f39
Author: Lei Sun <[email protected]>
AuthorDate: Wed Sep 9 09:58:38 2020 -0700
[GOBBLIN-1250] Open Sourcing ORC writer
WIP: Moving Orc Writer in OSS
Build successful for ORC writer
Create the right package structure for the module
Enrich gradle files
Still need to fix test dependency
Fix testCompile and findbugsMain issue
Remove a linkedin-specific comment
Addressing comments and resolve conflicts
Closes #3090 from autumnust/OrcWriter
---
gobblin-core/build.gradle | 1 -
gobblin-modules/gobblin-orc/build.gradle | 50 +++
.../gobblin/writer/AvroOrcSchemaConverter.java | 137 +++++++
.../gobblin/writer/CloseBeforeFlushException.java | 31 ++
.../writer/GenericRecordToOrcValueWriter.java | 409 +++++++++++++++++++++
.../apache/gobblin/writer/GobblinOrcWriter.java | 266 ++++++++++++++
.../gobblin/writer/GobblinOrcWriterBuilder.java | 50 +++
.../org/apache/gobblin/writer/OrcValueWriter.java | 39 ++
.../gobblin/writer/AvroOrcSchemaConverterTest.java | 203 ++++++++++
.../writer/GenericRecordToOrcValueWriterTest.java | 183 +++++++++
.../gobblin/writer/GobblinOrcWriterTest.java | 171 +++++++++
.../src/test/resources/list_map_test/data.json | 66 ++++
.../src/test/resources/list_map_test/schema.avsc | 21 ++
.../test/resources/orc_writer_list_test/data.json | 61 +++
.../resources/orc_writer_list_test/schema.avsc | 18 +
.../src/test/resources/orc_writer_test/data.json | 8 +
.../src/test/resources/orc_writer_test/schema.avsc | 15 +
.../src/test/resources/union_test/data.json | 26 ++
.../src/test/resources/union_test/schema.avsc | 15 +
gradle/scripts/dependencyDefinitions.gradle | 2 +
20 files changed, 1771 insertions(+), 1 deletion(-)
diff --git a/gobblin-core/build.gradle b/gobblin-core/build.gradle
index 50f56c4..8ec69c5 100644
--- a/gobblin-core/build.gradle
+++ b/gobblin-core/build.gradle
@@ -56,7 +56,6 @@ dependencies {
compile externalDependency.oltu
compile externalDependency.opencsv
compile externalDependency.hadoopHdfs
-
runtimeOnly externalDependency.protobuf
testRuntime externalDependency.hadoopAws
diff --git a/gobblin-modules/gobblin-orc/build.gradle
b/gobblin-modules/gobblin-orc/build.gradle
new file mode 100644
index 0000000..68f7857
--- /dev/null
+++ b/gobblin-modules/gobblin-orc/build.gradle
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: 'java'
+
+dependencies {
+ // Need to exclude storage-api (ColumnVector e.g.) brought in from Hive 1.0.1
+ // but to use HiveStorageAPI 2.x specified below.
+ compile (project(':gobblin-core')) {
+ exclude group: 'org.apache.hive', module: 'hive-exec'
+ }
+
+ // Cannot use compileOnly as it cannot cover testCompile
+ compile externalDependency.avro
+ compile externalDependency.hiveStorageApi
+ compile externalDependency.orcCore
+
+ testCompile externalDependency.testng
+ testCompile externalDependency.mockito
+ testCompile externalDependency.hiveSerDe
+ testCompile externalDependency.orcMapreduce
+}
+
+configurations {
+ // Remove xerces dependencies because of versioning issues. Standard JRE
implementation should
+ // work. See also
http://stackoverflow.com/questions/11677572/dealing-with-xerces-hell-in-java-maven
+ // HADOOP-5254 and MAPREDUCE-5664
+ all*.exclude group: 'xml-apis'
+ all*.exclude group: 'xerces'
+}
+
+test {
+ workingDir rootProject.rootDir
+}
+
+ext.classification="library"
diff --git
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/AvroOrcSchemaConverter.java
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/AvroOrcSchemaConverter.java
new file mode 100644
index 0000000..3f16af7
--- /dev/null
+++
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/AvroOrcSchemaConverter.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.writer;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.avro.Schema;
+import org.apache.orc.TypeDescription;
+
+
+/**
+ * A utility class that provides a method to convert {@link Schema} into
{@link TypeDescription}.
+ */
+public class AvroOrcSchemaConverter {
+ public static TypeDescription getOrcSchema(Schema avroSchema) {
+
+ final Schema.Type type = avroSchema.getType();
+ switch (type) {
+ case NULL:
+ // empty union represents null type
+ final TypeDescription nullUnion = TypeDescription.createUnion();
+ return nullUnion;
+ case LONG:
+ return TypeDescription.createLong();
+ case INT:
+ return TypeDescription.createInt();
+ case BYTES:
+ return TypeDescription.createBinary();
+ case ARRAY:
+ return
TypeDescription.createList(getOrcSchema(avroSchema.getElementType()));
+ case RECORD:
+ final TypeDescription recordStruct = TypeDescription.createStruct();
+ for (Schema.Field field2 : avroSchema.getFields()) {
+ final Schema fieldSchema = field2.schema();
+ final TypeDescription fieldType = getOrcSchema(fieldSchema);
+ if (fieldType != null) {
+ recordStruct.addField(field2.name(), fieldType);
+ } else {
+ throw new IllegalStateException("Should never get a null type as
fieldType.");
+ }
+ }
+ return recordStruct;
+ case MAP:
+ return TypeDescription.createMap(
+ // in Avro maps, keys are always strings
+ TypeDescription.createString(),
getOrcSchema(avroSchema.getValueType()));
+ case UNION:
+ final List<Schema> nonNullMembers =
getNonNullMembersOfUnion(avroSchema);
+ if (isNullableUnion(avroSchema, nonNullMembers)) {
+ // a single non-null union member
+ // this is how Avro represents "nullable" types; as a union of the
NULL type with another
+ // since ORC already supports nullability of all types, just use the
child type directly
+ return getOrcSchema(nonNullMembers.get(0));
+ } else {
+ // not a nullable union type; represent as an actual ORC union of
them
+ final TypeDescription union = TypeDescription.createUnion();
+ for (final Schema childSchema : nonNullMembers) {
+ union.addUnionChild(getOrcSchema(childSchema));
+ }
+ return union;
+ }
+ case STRING:
+ return TypeDescription.createString();
+ case FLOAT:
+ return TypeDescription.createFloat();
+ case DOUBLE:
+ return TypeDescription.createDouble();
+ case BOOLEAN:
+ return TypeDescription.createBoolean();
+ case ENUM:
+ // represent as String for now
+ return TypeDescription.createString();
+ case FIXED:
+ return TypeDescription.createBinary();
+ default:
+ throw new IllegalStateException(String.format("Unrecognized Avro type:
%s", type.getName()));
+ }
+ }
+
+ /**
+ * A helper method to check if the union is a nullable union. This check is
to distinguish the case between a nullable and
+ * a non-nullable union, each with a single member. In the former case, we
want to "flatten" to the member type, while
+ * in the case of the latter (i.e. non-nullable type), we want to preserve
the union type.
+ * @param unionSchema
+ * @param nonNullMembers
+ * @return true if the unionSchema is a nullable, false otherwise.
+ */
+ private static boolean isNullableUnion(Schema unionSchema, List<Schema>
nonNullMembers) {
+ return unionSchema.getTypes().size() == 2 && nonNullMembers.size() == 1;
+ }
+
+ /**
+ * In Avro, a union defined with null in the first and only one type after
is considered as a nullable
+ * field instead of the real union type.
+ *
+ * For this type of schema, get all member types to help examine the real
type of it.
+ */
+ public static List<Schema> getNonNullMembersOfUnion(Schema unionSchema) {
+ return unionSchema.getTypes().stream().filter(schema ->
!Schema.Type.NULL.equals(schema.getType()))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Examine the Avro {@link Schema} object and get rid of "null" type in the
beginning, which essentially indicates
+ * the type is nullable. The elimination of null type from union member list
is important to keep consistent with
+ * {@link TypeDescription} object in terms of index of union member.
+ */
+ public static Schema sanitizeNullableSchema(Schema avroSchema) {
+ if (avroSchema.getType() != Schema.Type.UNION) {
+ return avroSchema;
+ }
+
+ // Processing union schema.
+ List<Schema> members = getNonNullMembersOfUnion(avroSchema);
+ if (isNullableUnion(avroSchema, members)) {
+ return members.get(0);
+ } else {
+ // Reconstruct Avro Schema by eliminating null.
+ return Schema.createUnion(members);
+ }
+ }
+}
diff --git
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/CloseBeforeFlushException.java
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/CloseBeforeFlushException.java
new file mode 100644
index 0000000..0852470
--- /dev/null
+++
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/CloseBeforeFlushException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+public class CloseBeforeFlushException extends RuntimeException {
+ String datasetName;
+
+ public CloseBeforeFlushException(String datasetName) {
+ super(String.format("Dataset %s has an attempt to close writer before
buffered data to be flushed", datasetName));
+ }
+
+ public CloseBeforeFlushException(String datasetName, Throwable cause) {
+ super(String.format("Dataset %s has an attempt to close writer before
buffered data to be flushed", datasetName),
+ cause);
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
new file mode 100644
index 0000000..67352c7
--- /dev/null
+++
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.writer;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericEnumSymbol;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.TypeDescription;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+
+
+/**
+ * The converter for buffering rows and forming columnar batch.
+ */
+@Slf4j
+public class GenericRecordToOrcValueWriter implements
OrcValueWriter<GenericRecord> {
+ private static final String ENABLE_SMART_ARRAY_ENLARGE =
GobblinOrcWriter.ORC_WRITER_PREFIX + "enabledMulValueColumnVectorSmartSizing";
+ private static final boolean DEFAULT_ENABLE_SMART_ARRAY_ENLARGE = false;
+ private static final String ENLARGE_FACTOR_KEY =
GobblinOrcWriter.ORC_WRITER_PREFIX + "enlargeFactor";
+ private static final int DEFAULT_ENLARGE_FACTOR = 3;
+
+ private boolean enabledSmartSizing;
+ private int enlargeFactor;
+
+ // A rough measure of how many times resize is triggered, helping on
debugging and testing.
+ @VisibleForTesting
+ public int resizeCount = 0;
+
+ /**
+ * The interface for the conversion from GenericRecord to ORC's
ColumnVectors.
+ */
+ interface Converter {
+ /**
+ * Take a value from the Generic record data value and add it to the ORC
output.
+ * @param rowId the row in the ColumnVector
+ * @param column either the column number or element number
+ * @param data Object which contains the data
+ * @param output the ColumnVector to put the value into
+ */
+ void addValue(int rowId, int column, Object data, ColumnVector output);
+ }
+
+ private final Converter[] converters;
+
+ public GenericRecordToOrcValueWriter(TypeDescription typeDescription, Schema
avroSchema) {
+ converters = buildConverters(typeDescription, avroSchema);
+ this.enabledSmartSizing = DEFAULT_ENABLE_SMART_ARRAY_ENLARGE;
+ this.enlargeFactor = DEFAULT_ENLARGE_FACTOR;
+ }
+
+ public GenericRecordToOrcValueWriter(TypeDescription typeDescription, Schema
avroSchema, State state) {
+ this(typeDescription, avroSchema);
+ this.enabledSmartSizing =
state.getPropAsBoolean(ENABLE_SMART_ARRAY_ENLARGE,
DEFAULT_ENABLE_SMART_ARRAY_ENLARGE);
+ this.enlargeFactor = state.getPropAsInt(ENLARGE_FACTOR_KEY,
DEFAULT_ENLARGE_FACTOR);
+ }
+
+ @Override
+ public void write(GenericRecord value, VectorizedRowBatch output)
+ throws IOException {
+
+ int row = output.size++;
+ for (int c = 0; c < converters.length; ++c) {
+ ColumnVector col = output.cols[c];
+ if (value.get(c) == null) {
+ col.noNulls = false;
+ col.isNull[row] = true;
+ } else {
+ col.isNull[row] = false;
+ converters[c].addValue(row, c, value.get(c), col);
+ }
+ }
+ }
+
+ static class BooleanConverter implements Converter {
+ public void addValue(int rowId, int column, Object data, ColumnVector
output) {
+ ((LongColumnVector) output).vector[rowId] = (boolean) data ? 1 : 0;
+ }
+ }
+
+ static class ByteConverter implements Converter {
+ public void addValue(int rowId, int column, Object data, ColumnVector
output) {
+ ((LongColumnVector) output).vector[rowId] = (byte) data;
+ }
+ }
+
+ static class ShortConverter implements Converter {
+ public void addValue(int rowId, int column, Object data, ColumnVector
output) {
+ ((LongColumnVector) output).vector[rowId] = (short) data;
+ }
+ }
+
+ static class IntConverter implements Converter {
+ public void addValue(int rowId, int column, Object data, ColumnVector
output) {
+ ((LongColumnVector) output).vector[rowId] = (int) data;
+ }
+ }
+
+ static class LongConverter implements Converter {
+ public void addValue(int rowId, int column, Object data, ColumnVector
output) {
+ ((LongColumnVector) output).vector[rowId] = (long) data;
+ }
+ }
+
+ static class FloatConverter implements Converter {
+ public void addValue(int rowId, int column, Object data, ColumnVector
output) {
+ ((DoubleColumnVector) output).vector[rowId] = (float) data;
+ }
+ }
+
+ static class DoubleConverter implements Converter {
+ public void addValue(int rowId, int column, Object data, ColumnVector
output) {
+ ((DoubleColumnVector) output).vector[rowId] = (double) data;
+ }
+ }
+
+ static class StringConverter implements Converter {
+ public void addValue(int rowId, int column, Object data, ColumnVector
output) {
+ final byte[] value;
+ if (data instanceof GenericEnumSymbol) {
+ value = data.toString().getBytes(StandardCharsets.UTF_8);
+ } else if (data instanceof Enum) {
+ value = ((Enum) data).name().getBytes(StandardCharsets.UTF_8);
+ } else if (data instanceof Utf8) {
+ value = ((Utf8) data).getBytes();
+ } else {
+ value = ((String) data).getBytes(StandardCharsets.UTF_8);
+ }
+ ((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
+ }
+ }
+
+ static class BytesConverter implements Converter {
+ public void addValue(int rowId, int column, Object data, ColumnVector
output) {
+ final byte[] value;
+ if (data instanceof GenericFixed) {
+ value = ((GenericFixed) data).bytes();
+ } else if (data instanceof ByteBuffer) {
+ value = ((ByteBuffer) data).array();
+ } else {
+ value = (byte[]) data;
+ }
+ ((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
+ }
+ }
+
+ static class DecimalConverter implements Converter {
+
+ public void addValue(int rowId, int column, Object data, ColumnVector
output) {
+ ((DecimalColumnVector)
output).vector[rowId].set(HiveDecimal.create((BigDecimal) data));
+ }
+ }
+
+ class StructConverter implements Converter {
+ private final Converter[] children;
+
+ StructConverter(TypeDescription schema, Schema avroSchema) {
+ children = new Converter[schema.getChildren().size()];
+ for (int c = 0; c < children.length; ++c) {
+ children[c] = buildConverter(schema.getChildren().get(c),
avroSchema.getFields().get(c).schema());
+ }
+ }
+
+ public void addValue(int rowId, int column, Object data, ColumnVector
output) {
+ GenericRecord value = (GenericRecord) data;
+ StructColumnVector cv = (StructColumnVector) output;
+ for (int c = 0; c < children.length; ++c) {
+ ColumnVector field = cv.fields[c];
+ if (value.get(c) == null) {
+ field.noNulls = false;
+ field.isNull[rowId] = true;
+ } else {
+ field.isNull[rowId] = false;
+ children[c].addValue(rowId, c, value.get(c), field);
+ }
+ }
+ }
+ }
+
+ class UnionConverter implements Converter {
+ private final Converter[] children;
+ private final Schema unionSchema;
+
+ UnionConverter(TypeDescription schema, Schema avroSchema) {
+ children = new Converter[schema.getChildren().size()];
+ for (int c = 0; c < children.length; ++c) {
+ children[c] = buildConverter(schema.getChildren().get(c),
avroSchema.getTypes().get(c));
+ }
+ this.unionSchema = avroSchema;
+ }
+
+ /**
+ * @param data Object which contains the data, for Union, this data object
is already the
+ * original data type without union wrapper.
+ */
+ @Override
+ public void addValue(int rowId, int column, Object data, ColumnVector
output) {
+ UnionColumnVector cv = (UnionColumnVector) output;
+ int tag = (data != null) ? GenericData.get().resolveUnion(unionSchema,
data) : children.length;
+
+ for (int c = 0; c < children.length; ++c) {
+ ColumnVector field = cv.fields[c];
+ // If c == tag that indicates data must not be null
+ if (c == tag) {
+ field.isNull[rowId] = false;
+ cv.tags[rowId] = c;
+ children[c].addValue(rowId, c, data, field);
+ } else {
+ field.noNulls = false;
+ field.isNull[rowId] = true;
+ }
+ }
+ }
+ }
+
+ class ListConverter implements Converter {
+ private final Converter children;
+ // Keep track of total number of rows being added to help calculate row's
avg size.
+ private int rowsAdded;
+
+ ListConverter(TypeDescription schema, Schema avroSchema) {
+ children = buildConverter(schema.getChildren().get(0),
avroSchema.getElementType());
+ rowsAdded = 0;
+ }
+
+ public void addValue(int rowId, int column, Object data, ColumnVector
output) {
+ rowsAdded += 1;
+ List value = (List) data;
+ ListColumnVector cv = (ListColumnVector) output;
+
+ // record the length and start of the list elements
+ cv.lengths[rowId] = value.size();
+ cv.offsets[rowId] = cv.childCount;
+ cv.childCount += cv.lengths[rowId];
+ // make sure the child is big enough
+ // If seeing child array being saturated, will need to expand with a
reasonable amount.
+ if (cv.childCount > cv.child.isNull.length) {
+ int resizedLength = resize(rowsAdded, cv.isNull.length, cv.childCount);
+ cv.child.ensureSize(resizedLength, true);
+ }
+
+ // Add each element
+ for (int e = 0; e < cv.lengths[rowId]; ++e) {
+ int offset = (int) (e + cv.offsets[rowId]);
+ if (value.get(e) == null) {
+ cv.child.noNulls = false;
+ cv.child.isNull[offset] = true;
+ } else {
+ cv.child.isNull[offset] = false;
+ children.addValue(offset, e, value.get(e), cv.child);
+ }
+ }
+ }
+ }
+
+ class MapConverter implements Converter {
+ private final Converter keyConverter;
+ private final Converter valueConverter;
+ // Keep track of total number of rows being added to help calculate row's
avg size.
+ private int rowsAdded;
+
+ MapConverter(TypeDescription schema, Schema avroSchema) {
+ keyConverter = buildConverter(schema.getChildren().get(0),
SchemaBuilder.builder().stringType());
+ valueConverter = buildConverter(schema.getChildren().get(1),
avroSchema.getValueType());
+ rowsAdded = 0;
+ }
+
+ public void addValue(int rowId, int column, Object data, ColumnVector
output) {
+ rowsAdded += 1;
+ Map<Object, Object> map = (Map<Object, Object>) data;
+ Set<Map.Entry<Object, Object>> entries = map.entrySet();
+ MapColumnVector cv = (MapColumnVector) output;
+
+ // record the length and start of the list elements
+ cv.lengths[rowId] = entries.size();
+ cv.offsets[rowId] = cv.childCount;
+ cv.childCount += cv.lengths[rowId];
+ // make sure the child is big enough
+ if (cv.childCount > cv.keys.isNull.length) {
+ int resizedLength = resize(rowsAdded, cv.isNull.length, cv.childCount);
+ cv.keys.ensureSize(resizedLength, true);
+ cv.values.ensureSize(resizedLength, true);
+ }
+ // Add each element
+ int e = 0;
+ for (Map.Entry entry : entries) {
+ int offset = (int) (e + cv.offsets[rowId]);
+ if (entry.getKey() == null) {
+ cv.keys.noNulls = false;
+ cv.keys.isNull[offset] = true;
+ } else {
+ cv.keys.isNull[offset] = false;
+ keyConverter.addValue(offset, e, entry.getKey(), cv.keys);
+ }
+ if (entry.getValue() == null) {
+ cv.values.noNulls = false;
+ cv.values.isNull[offset] = true;
+ } else {
+ cv.values.isNull[offset] = false;
+ valueConverter.addValue(offset, e, entry.getValue(), cv.values);
+ }
+ e++;
+ }
+ }
+ }
+
+ /**
+ * Resize the child-array size based on configuration.
+ * If smart-sizing is enabled, it will using the avg size of container and
expand the whole child array to
+ * delta(avgSizeOfContainer * numberOfContainer(batchSize)) the first time
this is called.
+ * If there's further resize requested, it will add delta again to be
conservative, but chances of adding delta
+ * for multiple times should be low, unless the container size is
fluctuating too much.
+ */
+ private int resize(int rowsAdded, int batchSize, int currentSize) {
+ resizeCount += 1;
+ log.info(String.format("It has been resized %s times in current writer",
resizeCount));
+ return enabledSmartSizing ? currentSize + (currentSize / rowsAdded + 1) *
batchSize : enlargeFactor * currentSize;
+ }
+
+ private Converter buildConverter(TypeDescription schema, Schema avroSchema) {
+ switch (schema.getCategory()) {
+ case BOOLEAN:
+ return new BooleanConverter();
+ case BYTE:
+ return new ByteConverter();
+ case SHORT:
+ return new ShortConverter();
+ case INT:
+ return new IntConverter();
+ case LONG:
+ return new LongConverter();
+ case FLOAT:
+ return new FloatConverter();
+ case DOUBLE:
+ return new DoubleConverter();
+ case BINARY:
+ return new BytesConverter();
+ case STRING:
+ case CHAR:
+ case VARCHAR:
+ return new StringConverter();
+ case DECIMAL:
+ return new DecimalConverter();
+ case STRUCT:
+ return new StructConverter(schema,
AvroOrcSchemaConverter.sanitizeNullableSchema(avroSchema));
+ case LIST:
+ return new ListConverter(schema,
AvroOrcSchemaConverter.sanitizeNullableSchema(avroSchema));
+ case MAP:
+ return new MapConverter(schema,
AvroOrcSchemaConverter.sanitizeNullableSchema(avroSchema));
+ case UNION:
+ return new UnionConverter(schema,
AvroOrcSchemaConverter.sanitizeNullableSchema(avroSchema));
+ default:
+ throw new IllegalArgumentException("Unhandled type " + schema);
+ }
+ }
+
+ private Converter[] buildConverters(TypeDescription schema, Schema
avroSchema) {
+ if (schema.getCategory() != TypeDescription.Category.STRUCT) {
+ throw new IllegalArgumentException("Top level must be a struct " +
schema);
+ }
+ List<TypeDescription> children = schema.getChildren();
+ Converter[] result = new Converter[children.size()];
+ for (int c = 0; c < children.size(); ++c) {
+ result[c] = buildConverter(children.get(c),
avroSchema.getFields().get(c).schema());
+ }
+ return result;
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
new file mode 100644
index 0000000..22957b0
--- /dev/null
+++
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.state.ConstructState;
+
+
+/**
+ * A wrapper for ORC-core writer without dependency on Hive SerDe library.
+ */
+@Slf4j
+public class GobblinOrcWriter extends FsDataWriter<GenericRecord> {
+ static final String ORC_WRITER_PREFIX = "orcWriter.";
+ private static final String ORC_WRITER_BATCH_SIZE = ORC_WRITER_PREFIX +
"batchSize";
+ private static final int DEFAULT_ORC_WRITER_BATCH_SIZE = 1000;
+
+ /**
+ * Check comment of {@link #deepCleanRowBatch} for the usage of this
configuration.
+ */
+ private static final String ORC_WRITER_DEEP_CLEAN_EVERY_BATCH =
ORC_WRITER_PREFIX + "deepCleanBatch";
+
+ private final GenericRecordToOrcValueWriter valueWriter;
+ @VisibleForTesting
+ final VectorizedRowBatch rowBatch;
+ private final Writer orcFileWriter;
+
+ // the close method may be invoked multiple times, but the underlying writer
only supports close being called once
+ private volatile boolean closed = false;
+ private final boolean deepCleanBatch;
+
+ private final int batchSize;
+ private final Schema avroSchema;
+
+ public GobblinOrcWriter(FsDataWriterBuilder<Schema, GenericRecord> builder,
State properties)
+ throws IOException {
+ super(builder, properties);
+
+
+ // Create value-writer which is essentially a record-by-record-converter
with buffering in batch.
+ this.avroSchema = builder.getSchema();
+ TypeDescription typeDescription =
AvroOrcSchemaConverter.getOrcSchema(this.avroSchema);
+ this.valueWriter = new GenericRecordToOrcValueWriter(typeDescription,
this.avroSchema, properties);
+ this.batchSize = properties.getPropAsInt(ORC_WRITER_BATCH_SIZE,
DEFAULT_ORC_WRITER_BATCH_SIZE);
+ this.rowBatch = typeDescription.createRowBatch(this.batchSize);
+ this.deepCleanBatch =
properties.getPropAsBoolean(ORC_WRITER_DEEP_CLEAN_EVERY_BATCH, false);
+
+ log.info("Start to construct a ORC-Native Writer, with batchSize:" +
batchSize + ", enable batchDeepClean:"
+ + deepCleanBatch + "\n, schema in avro format:" + this.avroSchema);
+
+ // Create file-writer
+ Configuration conf = new Configuration();
+ // Populate job Configurations into Conf as well so that configurations
related to ORC writer can be tuned easily.
+ for (Object key : properties.getProperties().keySet()) {
+ conf.set((String) key, properties.getProp((String) key));
+ }
+
+ OrcFile.WriterOptions options =
OrcFile.writerOptions(properties.getProperties(), conf);
+ options.setSchema(typeDescription);
+
+ // For buffer-writer, flush has to be executed before close so it is
better we maintain the life-cycle of fileWriter
+ // instead of delegating it to closer object in FsDataWriter.
+ this.orcFileWriter = OrcFile.createWriter(this.stagingFile, options);
+ }
+
+ @Override
+ public long recordsWritten() {
+ return this.orcFileWriter.getNumberOfRows();
+ }
+
+ @Override
+ public State getFinalState() {
+ /**
+ * Creating {@link ConstructState} to provide overwrite of {@link
WorkUnitState} from constructs.
+ */
+ ConstructState state = new ConstructState(super.getFinalState());
+ try {
+ state.addOverwriteProperties(new
State(getOrcSchemaAttrs(this.avroSchema.toString())));
+ } catch (SerDeException | IOException e) {
+ throw new RuntimeException("Failure to set schema metadata in finalState
properly which "
+ + "could possible lead to incorrect data registration", e);
+ }
+
+ return state;
+ }
+
+ @Override
+ public void flush()
+ throws IOException {
+ if (rowBatch.size > 0) {
+ orcFileWriter.addRowBatch(rowBatch);
+ rowBatch.reset();
+ if (deepCleanBatch) {
+ deepCleanRowBatch(rowBatch);
+ }
+ }
+ }
+
+ private synchronized void closeInternal()
+ throws IOException {
+ if (!closed) {
+ this.flush();
+ this.orcFileWriter.close();
+ this.closed = true;
+ } else {
+ // Throw fatal exception if there's outstanding buffered data since
there's risk losing data if proceeds.
+ if (rowBatch.size > 0) {
+ throw new CloseBeforeFlushException(this.avroSchema.getName());
+ }
+ }
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ closeInternal();
+ super.close();
+ }
+
+ /**
+ * Extra careful about the fact: super.commit() invoke closer.close based on
its semantics of "commit".
+ * That means close can happen in both commit() and close()
+ */
+ @Override
+ public void commit()
+ throws IOException {
+ closeInternal();
+ super.commit();
+ }
+
+ /**
+ * Note: orc.rows.between.memory.checks is the configuration available to
tune memory-check sensitivity in ORC-Core
+ * library. By default it is set to 5000. If the user-application is dealing
with large-row Kafka topics for example,
+ * one should consider lower this value to make memory-check more active.
+ */
+ @Override
+ public void write(GenericRecord record)
+ throws IOException {
+ valueWriter.write(record, rowBatch);
+ if (rowBatch.size == this.batchSize) {
+ orcFileWriter.addRowBatch(rowBatch);
+ rowBatch.reset();
+ if (deepCleanBatch) {
+ log.info("A reset of rowBatch is triggered - releasing holding memory
for large object");
+ deepCleanRowBatch(rowBatch);
+ }
+ }
+ }
+
+ /**
+ * The reset call of {@link VectorizedRowBatch} doesn't release the memory
occupied by each {@link ColumnVector}'s child,
+ * which is usually an array of objects, while it only set those value to
null.
+ * This method ensure the reference to the child array for {@link
ColumnVector} are released and gives a hint of GC,
+ * so that each reset could release the memory pre-allocated by {@link
ColumnVector#ensureSize(int, boolean)} method.
+ *
+ * This feature is configurable and should only be turned on if a dataset is:
+ * 1. Has large per-record size.
+ * 2. Has {@link
org.apache.hadoop.hive.ql.exec.vector.MultiValuedColumnVector} as part of
schema,
+ * like array, map and all nested structures containing these.
+ */
+ @VisibleForTesting
+ void deepCleanRowBatch(VectorizedRowBatch rowBatch) {
+ for(int i = 0; i < rowBatch.cols.length; ++i) {
+ ColumnVector cv = rowBatch.cols[i];
+ if (cv != null) {
+ removeRefOfColumnVectorChild(cv);
+ }
+ }
+ }
+
+ /**
+ * Set the child field of {@link ColumnVector} to null, assuming input
{@link ColumnVector} is nonNull.
+ */
+ private void removeRefOfColumnVectorChild(ColumnVector cv) {
+ if (cv instanceof StructColumnVector) {
+ StructColumnVector structCv = (StructColumnVector) cv;
+ for (ColumnVector childCv: structCv.fields) {
+ removeRefOfColumnVectorChild(childCv);
+ }
+ } else if (cv instanceof ListColumnVector) {
+ ListColumnVector listCv = (ListColumnVector) cv;
+ removeRefOfColumnVectorChild(listCv.child);
+ } else if (cv instanceof MapColumnVector) {
+ MapColumnVector mapCv = (MapColumnVector) cv;
+ removeRefOfColumnVectorChild(mapCv.keys);
+ removeRefOfColumnVectorChild(mapCv.values);
+ } else if (cv instanceof UnionColumnVector) {
+ UnionColumnVector unionCv = (UnionColumnVector) cv;
+ for (ColumnVector unionChildCv : unionCv.fields) {
+ removeRefOfColumnVectorChild(unionChildCv);
+ }
+ } else if (cv instanceof LongColumnVector) {
+ ((LongColumnVector) cv).vector = null;
+ } else if (cv instanceof DoubleColumnVector) {
+ ((DoubleColumnVector) cv).vector = null;
+ } else if (cv instanceof BytesColumnVector) {
+ ((BytesColumnVector) cv).vector = null;
+ ((BytesColumnVector) cv).start = null;
+ ((BytesColumnVector) cv).length = null;
+ } else if (cv instanceof DecimalColumnVector) {
+ ((DecimalColumnVector) cv).vector = null;
+ }
+ }
+
+ @Override
+ public boolean isSpeculativeAttemptSafe() {
+ return this.writerAttemptIdOptional.isPresent() && this.getClass() ==
GobblinOrcWriter.class;
+ }
+
+ public static Properties getOrcSchemaAttrs(String avroSchemaString)
+ throws SerDeException, IOException {
+ Properties properties = new Properties();
+
properties.setProperty(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(),
avroSchemaString);
+
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(
+ AvroSerdeUtils.determineSchemaOrThrowException(properties));
+
+ properties.setProperty("columns", StringUtils.join(aoig.getColumnNames(),
","));
+ properties.setProperty("columns.types",
StringUtils.join(aoig.getColumnTypes(), ","));
+
+ return properties;
+ }
+}
diff --git
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterBuilder.java
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterBuilder.java
new file mode 100644
index 0000000..3097397
--- /dev/null
+++
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterBuilder.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+
+/**
+ * The WriterBuilder extension to create {@link GobblinOrcWriter} on top of
{@link FsDataWriterBuilder}
+ */
+public class GobblinOrcWriterBuilder extends FsDataWriterBuilder<Schema,
GenericRecord> {
+ public GobblinOrcWriterBuilder() {
+ }
+
+ @Override
+ public DataWriter<GenericRecord> build()
+ throws IOException {
+ Preconditions.checkNotNull(this.destination);
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(this.writerId));
+ Preconditions.checkNotNull(this.schema);
+
+ switch (this.destination.getType()) {
+ case HDFS:
+ return new GobblinOrcWriter(this, this.destination.getProperties());
+ default:
+ throw new RuntimeException("Unknown destination type: " +
this.destination.getType());
+ }
+ }
+}
diff --git
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/OrcValueWriter.java
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/OrcValueWriter.java
new file mode 100644
index 0000000..e500da7
--- /dev/null
+++
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/OrcValueWriter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
+
+/**
+ * Write data value of a schema.
+ * @param <T> Indicating the input type, {@link
org.apache.avro.generic.GenericRecord} for example.
+ */
+public interface OrcValueWriter<T> {
+
+ /**
+ * Writes the data.
+ * @param value the data value to write.
+ * @param output the VectorizedRowBatch to which the output will be written.
+ * @throws IOException if there's any IO error while writing the data value.
+ */
+ void write(T value, VectorizedRowBatch output)
+ throws IOException;
+}
\ No newline at end of file
diff --git
a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/AvroOrcSchemaConverterTest.java
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/AvroOrcSchemaConverterTest.java
new file mode 100644
index 0000000..e03c0eb
--- /dev/null
+++
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/AvroOrcSchemaConverterTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.orc.TypeDescription;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Preconditions;
+
+import static
org.apache.gobblin.writer.AvroOrcSchemaConverter.sanitizeNullableSchema;
+
+
+public class AvroOrcSchemaConverterTest {
+ @Test
+ public void testUnionORCSchemaTranslation() throws Exception {
+ Schema avroUnion = SchemaBuilder.record("test")
+ .fields()
+ .name("test_union")
+
.type(SchemaBuilder.builder().unionOf().stringType().and().intType().and().nullType().endUnion())
+ .noDefault()
+ .endRecord();
+
+ TypeDescription unionSchema = TypeDescription.createUnion()
+ .addUnionChild(TypeDescription.createString())
+ .addUnionChild(TypeDescription.createInt());
+ TypeDescription recordSchemaWithUnion =
TypeDescription.createStruct().addField("test_union", unionSchema);
+
+ // Verify the schema conversion for Union works
+ Assert.assertEquals(AvroOrcSchemaConverter.getOrcSchema(avroUnion),
recordSchemaWithUnion);
+
+ //Create a nullable union field
+ Schema nullableAvroUnion = SchemaBuilder.record("test")
+ .fields()
+ .name("test_union")
+
.type(SchemaBuilder.builder().unionOf().stringType().and().nullType().endUnion())
+ .noDefault()
+ .endRecord();
+ //Assert that Orc schema has flattened the nullable union to the member's
type
+ Assert.assertEquals(AvroOrcSchemaConverter.getOrcSchema(nullableAvroUnion),
+ TypeDescription.createStruct().addField("test_union",
TypeDescription.createString()));
+
+ //Create a non nullable union type
+ Schema nonNullableAvroUnion = SchemaBuilder.record("test")
+ .fields()
+ .name("test_union")
+ .type(SchemaBuilder.builder().unionOf().stringType().endUnion())
+ .noDefault()
+ .endRecord();
+ //Ensure that the union type is preserved
+
Assert.assertEquals(AvroOrcSchemaConverter.getOrcSchema(nonNullableAvroUnion),
TypeDescription.createStruct()
+ .addField("test_union",
TypeDescription.createUnion().addUnionChild(TypeDescription.createString())));
+ }
+
+ @Test
+ public void testTrivialAvroSchemaTranslation() throws Exception {
+
+ // Trivial cases
+ Schema avroSchema = SchemaBuilder.record("test")
+ .fields()
+ .name("string_type")
+ .type(SchemaBuilder.builder().stringType())
+ .noDefault()
+ .name("int_type")
+ .type(SchemaBuilder.builder().intType())
+ .noDefault()
+ .endRecord();
+
+ TypeDescription orcSchema = TypeDescription.createStruct()
+ .addField("string_type", TypeDescription.createString())
+ .addField("int_type", TypeDescription.createInt());
+
+ // Top-level record name will not be replicated in conversion result.
+ Assert.assertEquals(avroSchema.getFields(),
getAvroSchema(orcSchema).getFields());
+ }
+
+ @Test
+ public void testUnionAvroSchemaTranslation() throws Exception {
+ Schema avroSchema = SchemaBuilder.record("test")
+ .fields()
+ .name("union_nested")
+
.type(SchemaBuilder.builder().unionOf().stringType().and().intType().endUnion())
+ .noDefault()
+ .endRecord();
+ TypeDescription orcSchema = TypeDescription.createStruct()
+ .addField("union_nested", TypeDescription.createUnion()
+ .addUnionChild(TypeDescription.createString())
+ .addUnionChild(TypeDescription.createInt()));
+
+ Assert.assertEquals(avroSchema.getFields(),
getAvroSchema(orcSchema).getFields());
+ }
+
+ @Test
+ public void testSchemaSanitization() throws Exception {
+
+ // Two field along with null
+ Schema avroSchema =
SchemaBuilder.builder().unionOf().nullType().and().stringType().and().intType().endUnion();
+ Schema expectedSchema =
SchemaBuilder.builder().unionOf().stringType().and().intType().endUnion();
+ Assert.assertEquals(sanitizeNullableSchema(avroSchema), expectedSchema);
+
+ // Only one field except null
+ Schema avroSchema_1 = SchemaBuilder.builder()
+ .unionOf()
+ .nullType()
+ .and()
+ .record("test")
+ .fields()
+ .name("aaa")
+ .type(SchemaBuilder.builder().intType())
+ .noDefault()
+ .endRecord()
+ .endUnion();
+ expectedSchema = SchemaBuilder.builder()
+ .record("test")
+ .fields()
+ .name("aaa")
+ .type(SchemaBuilder.builder().intType())
+ .noDefault()
+ .endRecord();
+ Assert.assertEquals(sanitizeNullableSchema(avroSchema_1), expectedSchema);
+ }
+
+ public static Schema getAvroSchema(TypeDescription schema) {
+ final TypeDescription.Category type = schema.getCategory();
+ switch (type) {
+ case BYTE:
+ case SHORT:
+ case DATE:
+ case TIMESTAMP:
+ case VARCHAR:
+ case CHAR:
+ case DECIMAL:
+ throw new UnsupportedOperationException("Types like BYTE and SHORT
(and many more) are not supported in Avro");
+ case BOOLEAN:
+ return SchemaBuilder.builder().booleanType();
+ case INT:
+ return SchemaBuilder.builder().intType();
+ case LONG:
+ return SchemaBuilder.builder().longType();
+ case STRUCT:
+ // TODO: Cases that current implementation cannot support:
+ // union<struct1, struct2, ..., structN>
+ // All these structs will be assigned with the same name, while
calling "endUnion" an exception will be thrown.
+ // We would workaround this by assigning randomly-picked name while
that will cause difficulties in name-related
+ // resolution after translation, like `resolveUnion` method which is
relying on name.
+ SchemaBuilder.FieldAssembler assembler =
SchemaBuilder.record("nested").fields();
+ List<String> childFieldNames = schema.getFieldNames();
+ List<TypeDescription> childrenSchemas = schema.getChildren();
+ for (int i = 0; i < childrenSchemas.size(); i++) {
+ String fieldName = childFieldNames.get(i);
+ assembler =
assembler.name(fieldName).type(getAvroSchema(childrenSchemas.get(i))).noDefault();
+ }
+ return (Schema) assembler.endRecord();
+ case STRING:
+ return SchemaBuilder.builder().stringType();
+ case BINARY:
+ return SchemaBuilder.builder().bytesType();
+ case DOUBLE:
+ return SchemaBuilder.builder().doubleType();
+ case FLOAT:
+ return SchemaBuilder.builder().floatType();
+ case LIST:
+ return
SchemaBuilder.builder().array().items(getAvroSchema(schema.getChildren().get(0)));
+ case MAP:
+
Preconditions.checkArgument(schema.getChildren().get(0).getCategory().equals(TypeDescription.Category.STRING));
+ Preconditions.checkArgument(schema.getChildren().size() == 2);
+ return
SchemaBuilder.builder().map().values(getAvroSchema(schema.getChildren().get(1)));
+ case UNION:
+ SchemaBuilder.BaseTypeBuilder builder =
SchemaBuilder.builder().unionOf();
+ List<TypeDescription> unionChildrenSchemas = schema.getChildren();
+ for (int i = 0; i < unionChildrenSchemas.size(); i++) {
+ if (i < unionChildrenSchemas.size() - 1) {
+ builder = ((SchemaBuilder.UnionAccumulator<Schema>) builder.type(
+ getAvroSchema(unionChildrenSchemas.get(i)))).and();
+ } else {
+ return ((SchemaBuilder.UnionAccumulator<Schema>) builder.type(
+ getAvroSchema(unionChildrenSchemas.get(i)))).endUnion();
+ }
+ }
+ default:
+ throw new IllegalStateException("Unrecognized ORC type:" +
schema.getCategory());
+ }
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java
new file mode 100644
index 0000000..4660369
--- /dev/null
+++
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.orc.OrcFile;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcUnion;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.io.Files;
+
+import lombok.extern.slf4j.Slf4j;
+
+import static org.apache.orc.mapred.OrcMapredRecordReader.nextValue;
+
+
+@Slf4j
+public class GenericRecordToOrcValueWriterTest {
+ @Test
+ public void testUnionRecordConversionWriter()
+ throws Exception {
+ Schema schema =
+ new
Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("union_test/schema.avsc"));
+
+ TypeDescription orcSchema = AvroOrcSchemaConverter.getOrcSchema(schema);
+ GenericRecordToOrcValueWriter valueWriter = new
GenericRecordToOrcValueWriter(orcSchema, schema);
+ VectorizedRowBatch rowBatch = orcSchema.createRowBatch();
+
+ List<GenericRecord> recordList = GobblinOrcWriterTest
+ .deserializeAvroRecords(this.getClass(), schema,
"union_test/data.json");
+ for (GenericRecord record : recordList) {
+ valueWriter.write(record, rowBatch);
+ }
+
+ // Flush RowBatch into disk.
+ File tempFile = new File(Files.createTempDir(), "orc");
+ tempFile.deleteOnExit();
+ Path filePath = new Path(tempFile.getAbsolutePath());
+
+ OrcFile.WriterOptions options = OrcFile.writerOptions(new Properties(),
new Configuration());
+ options.setSchema(orcSchema);
+ Writer orcFileWriter = OrcFile.createWriter(filePath, options);
+ orcFileWriter.addRowBatch(rowBatch);
+ orcFileWriter.close();
+
+ // Load it back and compare.
+ FileSystem fs = FileSystem.get(new Configuration());
+ List<Writable> orcRecords = deserializeOrcRecords(filePath, fs);
+
+ Assert.assertEquals(orcRecords.size(), 5);
+
+ // Knowing all of them are OrcStruct<OrcUnion>, save the effort to
recursively convert GenericRecord to OrcStruct
+ // for comprehensive comparison which is non-trivial,
+ // although it is also theoretically possible and optimal way for doing
this unit test.
+ List<OrcUnion> unionList =
orcRecords.stream().map(this::getUnionFieldFromStruct).collect(Collectors.toList());
+
+ // Constructing all OrcUnion and verify all of them appears in unionList.
+ TypeDescription unionSchema = orcSchema.getChildren().get(0);
+ OrcUnion union_0 = new OrcUnion(unionSchema);
+ union_0.set((byte) 0, new Text("urn:li:member:3"));
+ Assert.assertTrue(unionList.contains(union_0));
+
+ OrcUnion union_1 = new OrcUnion(unionSchema);
+ union_1.set((byte) 0, new Text("urn:li:member:4"));
+ Assert.assertTrue(unionList.contains(union_1));
+
+ OrcUnion union_2 = new OrcUnion(unionSchema);
+ union_2.set((byte) 1, new IntWritable(2));
+ Assert.assertTrue(unionList.contains(union_2));
+
+ OrcUnion union_3 = new OrcUnion(unionSchema);
+ union_3.set((byte) 1, new IntWritable(1));
+ Assert.assertTrue(unionList.contains(union_3));
+
+ OrcUnion union_4 = new OrcUnion(unionSchema);
+ union_4.set((byte) 1, new IntWritable(3));
+ Assert.assertTrue(unionList.contains(union_4));
+ }
+
+ @Test
+ public void testListResize()
+ throws Exception {
+ Schema schema =
+ new
Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("list_map_test/schema.avsc"));
+
+ TypeDescription orcSchema = AvroOrcSchemaConverter.getOrcSchema(schema);
+ GenericRecordToOrcValueWriter valueWriter = new
GenericRecordToOrcValueWriter(orcSchema, schema);
+ // Make the batch size very small so that the enlarge behavior would
easily be triggered.
+ // But this has to more than the number of records that we deserialized
form data.json, as here we don't reset batch.
+ VectorizedRowBatch rowBatch = orcSchema.createRowBatch(10);
+
+ List<GenericRecord> recordList = GobblinOrcWriterTest
+ .deserializeAvroRecords(this.getClass(), schema,
"list_map_test/data.json");
+ Assert.assertEquals(recordList.size(), 6);
+ for (GenericRecord record : recordList) {
+ valueWriter.write(record, rowBatch);
+ }
+ // Examining resize count, which should happen only once for map and list,
so totally 2.
+ Assert.assertEquals(valueWriter.resizeCount, 2);
+ }
+
+ /**
+ * Accessing "fields" using reflection to work-around access modifiers.
+ */
+ private OrcUnion getUnionFieldFromStruct(Writable struct) {
+ try {
+ OrcStruct orcStruct = (OrcStruct) struct;
+ Field objectArr = OrcStruct.class.getDeclaredField("fields");
+ objectArr.setAccessible(true);
+ return (OrcUnion) ((Object[]) objectArr.get(orcStruct))[0];
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot access with reflection", e);
+ }
+ }
+
+ public static final List<Writable> deserializeOrcRecords(Path orcFilePath,
FileSystem fs)
+ throws IOException {
+ org.apache.orc.Reader fileReader = OrcFile.createReader(orcFilePath, new
OrcFile.ReaderOptions(new Configuration()));
+ RecordReader recordReader = fileReader.rows();
+ TypeDescription schema = fileReader.getSchema();
+ VectorizedRowBatch batch = schema.createRowBatch();
+ recordReader.nextBatch(batch);
+ int rowInBatch = 0;
+
+ // result container
+ List<Writable> orcRecords = new ArrayList<>();
+
+ long rowCount = fileReader.getNumberOfRows();
+ while (rowCount > 0) {
+ // Deserialize records using Mapreduce-like API
+ if (schema.getCategory() == TypeDescription.Category.STRUCT) {
+ OrcStruct result = (OrcStruct)
OrcStruct.createValue(fileReader.getSchema());
+ List<TypeDescription> children = schema.getChildren();
+ int numberOfChildren = children.size();
+ for (int i = 0; i < numberOfChildren; ++i) {
+ result.setFieldValue(i, nextValue(batch.cols[i], rowInBatch,
children.get(i), result.getFieldValue(i)));
+ }
+ orcRecords.add(result);
+ } else {
+ throw new UnsupportedOperationException("The serialized records have
to be a struct in the outer-most layer.");
+ }
+ rowCount -= 1;
+ rowInBatch += 1;
+ }
+ return orcRecords;
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinOrcWriterTest.java
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinOrcWriterTest.java
new file mode 100644
index 0000000..5a31530
--- /dev/null
+++
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinOrcWriterTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.io.Writable;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.io.Closer;
+import com.google.common.io.Files;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+import static
org.apache.gobblin.writer.GenericRecordToOrcValueWriterTest.deserializeOrcRecords;
+import static org.mockito.Mockito.*;
+
+
+public class GobblinOrcWriterTest {
+
+ public static final List<GenericRecord> deserializeAvroRecords(Class clazz,
Schema schema, String schemaPath)
+ throws IOException {
+ List<GenericRecord> records = new ArrayList<>();
+
+ GenericDatumReader<GenericRecord> reader = new
GenericDatumReader<>(schema);
+
+ InputStream dataInputStream =
clazz.getClassLoader().getResourceAsStream(schemaPath);
+ Decoder decoder = DecoderFactory.get().jsonDecoder(schema,
dataInputStream);
+ GenericRecord recordContainer = reader.read(null, decoder);
+ ;
+ try {
+ while (recordContainer != null) {
+ records.add(recordContainer);
+ recordContainer = reader.read(null, decoder);
+ }
+ } catch (IOException ioe) {
+ dataInputStream.close();
+ }
+ return records;
+ }
+
+ @Test
+ public void testRowBatchDeepClean() throws Exception {
+ Schema schema = new Schema.Parser().parse(
+
this.getClass().getClassLoader().getResourceAsStream("orc_writer_list_test/schema.avsc"));
+ List<GenericRecord> recordList = deserializeAvroRecords(this.getClass(),
schema, "orc_writer_list_test/data.json");
+ // Mock WriterBuilder, bunch of mocking behaviors to work-around
precondition checks in writer builder
+ FsDataWriterBuilder<Schema, GenericRecord> mockBuilder =
+ (FsDataWriterBuilder<Schema, GenericRecord>)
Mockito.mock(FsDataWriterBuilder.class);
+ when(mockBuilder.getSchema()).thenReturn(schema);
+ State dummyState = new WorkUnit();
+ String stagingDir = Files.createTempDir().getAbsolutePath();
+ String outputDir = Files.createTempDir().getAbsolutePath();
+ dummyState.setProp(ConfigurationKeys.WRITER_STAGING_DIR, stagingDir);
+ dummyState.setProp(ConfigurationKeys.WRITER_FILE_PATH, "simple");
+ dummyState.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, outputDir);
+ dummyState.setProp("orcWriter.deepCleanBatch", "true");
+ when(mockBuilder.getFileName(dummyState)).thenReturn("file");
+
+ Closer closer = Closer.create();
+
+ GobblinOrcWriter orcWriter = closer.register(new
GobblinOrcWriter(mockBuilder, dummyState));
+ for (GenericRecord genericRecord : recordList) {
+ orcWriter.write(genericRecord);
+ }
+ // Manual trigger flush
+ orcWriter.flush();
+
+ Assert.assertNull(((BytesColumnVector) ((ListColumnVector)
orcWriter.rowBatch.cols[0]).child).vector);
+ Assert.assertNull(((BytesColumnVector) orcWriter.rowBatch.cols[1]).vector);
+ }
+
+ /**
+ * A basic unit for trivial writer correctness.
+ * TODO: A detailed test suite of ORC-writer for different sorts of schema:
+ */
+ @Test
+ public void testWrite() throws Exception {
+ Schema schema =
+ new
Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("orc_writer_test/schema.avsc"));
+ List<GenericRecord> recordList = deserializeAvroRecords(this.getClass(),
schema, "orc_writer_test/data.json");
+
+ // Mock WriterBuilder, bunch of mocking behaviors to work-around
precondition checks in writer builder
+ FsDataWriterBuilder<Schema, GenericRecord> mockBuilder =
+ (FsDataWriterBuilder<Schema, GenericRecord>)
Mockito.mock(FsDataWriterBuilder.class);
+ when(mockBuilder.getSchema()).thenReturn(schema);
+
+ State dummyState = new WorkUnit();
+ String stagingDir = Files.createTempDir().getAbsolutePath();
+ String outputDir = Files.createTempDir().getAbsolutePath();
+ dummyState.setProp(ConfigurationKeys.WRITER_STAGING_DIR, stagingDir);
+ dummyState.setProp(ConfigurationKeys.WRITER_FILE_PATH, "simple");
+ dummyState.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, outputDir);
+ when(mockBuilder.getFileName(dummyState)).thenReturn("file");
+ Path outputFilePath = new Path(outputDir, "simple/file");
+
+ // Having a closer to manage the life-cycle of the writer object.
+ // Will verify if scenarios like double-close could survive.
+ Closer closer = Closer.create();
+ GobblinOrcWriter orcWriter = closer.register(new
GobblinOrcWriter(mockBuilder, dummyState));
+
+ // Create one more writer to test fail-case.
+ GobblinOrcWriter orcFailWriter = new GobblinOrcWriter(mockBuilder,
dummyState);
+
+ for (GenericRecord record : recordList) {
+ orcWriter.write(record);
+ orcFailWriter.write(record);
+ }
+
+ // Not yet flushed or reaching default batch size, no records should have
been materialized.
+ Assert.assertEquals(orcWriter.recordsWritten(), 0);
+ Assert.assertEquals(orcFailWriter.recordsWritten(), 0);
+
+ // Try close, should catch relevant CloseBeforeFlushException
+ try {
+ orcFailWriter.close();
+ } catch (CloseBeforeFlushException e) {
+ Assert.assertEquals(e.datasetName, schema.getName());
+ }
+
+ orcWriter.commit();
+ Assert.assertEquals(orcWriter.recordsWritten(), 2);
+
+ // Verify ORC file contains correct records.
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ Assert.assertTrue(fs.exists(outputFilePath));
+ List<Writable> orcRecords = deserializeOrcRecords(outputFilePath, fs);
+ Assert.assertEquals(orcRecords.size(), 2);
+
+ // Double-close without protection of
org.apache.gobblinGobblinOrcWriter#closed
+ // leads to NPE within
org.apache.orc.impl.PhysicalFsWriter.writeFileMetadata. Try removing protection
condition
+ // in close method implementation if want to verify.
+ try {
+ closer.close();
+ } catch (NullPointerException npe) {
+ Assert.fail();
+ }
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-modules/gobblin-orc/src/test/resources/list_map_test/data.json
b/gobblin-modules/gobblin-orc/src/test/resources/list_map_test/data.json
new file mode 100644
index 0000000..ae448f4
--- /dev/null
+++ b/gobblin-modules/gobblin-orc/src/test/resources/list_map_test/data.json
@@ -0,0 +1,66 @@
+{
+ "ids": [
+ 1,
+ 2,
+ 3
+ ],
+ "maps": {
+ "a": 1,
+ "b": 2
+ }
+}
+{
+ "ids": [
+ 3,
+ 4,
+ 5
+ ],
+ "maps": {
+ "a": 1,
+ "b": 2
+ }
+}
+{
+ "ids": [
+ 1,
+ 2,
+ 3
+ ],
+ "maps": {
+ "a": 1,
+ "b": 2
+ }
+}
+{
+ "ids": [
+ 6,
+ 7,
+ 8
+ ],
+ "maps": {
+ "a": 1,
+ "b": 2
+ }
+}
+{
+ "ids": [
+ 1,
+ 2,
+ 3
+ ],
+ "maps": {
+ "a": 1,
+ "b": 2
+ }
+}
+{
+ "ids": [
+ 6,
+ 7,
+ 8
+ ],
+ "maps": {
+ "a": 1,
+ "b": 2
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-modules/gobblin-orc/src/test/resources/list_map_test/schema.avsc
b/gobblin-modules/gobblin-orc/src/test/resources/list_map_test/schema.avsc
new file mode 100644
index 0000000..491073d
--- /dev/null
+++ b/gobblin-modules/gobblin-orc/src/test/resources/list_map_test/schema.avsc
@@ -0,0 +1,21 @@
+{
+ "namespace": "com.linkedin.orc",
+ "type": "record",
+ "name": "ORCTest",
+ "fields": [
+ {
+ "name": "ids",
+ "type": {
+ "type": "array",
+ "items": "int"
+ }
+ },
+ {
+ "name": "maps",
+ "type": {
+ "type": "map",
+ "values": "int"
+ }
+ }
+ ]
+}
diff --git
a/gobblin-modules/gobblin-orc/src/test/resources/orc_writer_list_test/data.json
b/gobblin-modules/gobblin-orc/src/test/resources/orc_writer_list_test/data.json
new file mode 100644
index 0000000..0d706ff
--- /dev/null
+++
b/gobblin-modules/gobblin-orc/src/test/resources/orc_writer_list_test/data.json
@@ -0,0 +1,61 @@
+{
+ "things": [
+ "a",
+ "a",
+ "a",
+ "a",
+ "a",
+ "a",
+ "a",
+ "a",
+ "a",
+ "a",
+ "a",
+ "a",
+ "a",
+ "a",
+ "a",
+ "a",
+ "a",
+ "a",
+ "a",
+ "a",
+ "a",
+ "a",
+ "a",
+ "a"
+ ],
+ "name":
"AlyssahasaverylongnameAlyssahasaverylongnameAlyssahasaverylongnameAlyssahasaverylongnameAlyssahasaverylongname"
+}
+{
+ "things": [
+ "b",
+ "b",
+ "b",
+ "b",
+ "b",
+ "b",
+ "b",
+ "b",
+ "b",
+ "b",
+ "b",
+ "b",
+ "b",
+ "b",
+ "b",
+ "b",
+ "b",
+ "b",
+ "b",
+ "b",
+ "b",
+ "b",
+ "b",
+ "b",
+ "b",
+ "b",
+ "b"
+ ],
+ "name":
"BobHasaverylongnameBobHasaverylongnameBobHasaverylongnameBobHasaverylongnameBobHasaverylongname"
+}
\ No newline at end of file
diff --git
a/gobblin-modules/gobblin-orc/src/test/resources/orc_writer_list_test/schema.avsc
b/gobblin-modules/gobblin-orc/src/test/resources/orc_writer_list_test/schema.avsc
new file mode 100644
index 0000000..b0b430a
--- /dev/null
+++
b/gobblin-modules/gobblin-orc/src/test/resources/orc_writer_list_test/schema.avsc
@@ -0,0 +1,18 @@
+{
+ "namespace": "com.linkedin.orc",
+ "type": "record",
+ "name": "ORCTest",
+ "fields": [
+ {
+ "name": "things",
+ "type": {
+ "type": "array",
+ "items": "string"
+ }
+ },
+ {
+ "name": "name",
+ "type": "string"
+ }
+ ]
+}
diff --git
a/gobblin-modules/gobblin-orc/src/test/resources/orc_writer_test/data.json
b/gobblin-modules/gobblin-orc/src/test/resources/orc_writer_test/data.json
new file mode 100644
index 0000000..6a41a43
--- /dev/null
+++ b/gobblin-modules/gobblin-orc/src/test/resources/orc_writer_test/data.json
@@ -0,0 +1,8 @@
+{
+ "id": 1,
+ "name": "Alyssa"
+}
+{
+ "id": 2,
+ "name": "Bob"
+}
\ No newline at end of file
diff --git
a/gobblin-modules/gobblin-orc/src/test/resources/orc_writer_test/schema.avsc
b/gobblin-modules/gobblin-orc/src/test/resources/orc_writer_test/schema.avsc
new file mode 100644
index 0000000..d4f3347
--- /dev/null
+++ b/gobblin-modules/gobblin-orc/src/test/resources/orc_writer_test/schema.avsc
@@ -0,0 +1,15 @@
+{
+ "namespace": "com.linkedin.orc",
+ "type": "record",
+ "name": "ORCTest",
+ "fields": [
+ {
+ "name": "id",
+ "type": "int"
+ },
+ {
+ "name": "name",
+ "type": "string"
+ }
+ ]
+}
diff --git
a/gobblin-modules/gobblin-orc/src/test/resources/union_test/data.json
b/gobblin-modules/gobblin-orc/src/test/resources/union_test/data.json
new file mode 100644
index 0000000..d00ab32
--- /dev/null
+++ b/gobblin-modules/gobblin-orc/src/test/resources/union_test/data.json
@@ -0,0 +1,26 @@
+{
+ "id": {
+ "string": "urn:li:member:3"
+ }
+}
+{
+ "id": {
+ "string": "urn:li:member:4"
+ }
+}
+{
+ "id": {
+ "int": 2
+ }
+}
+{
+ "id": {
+ "int": 1
+ }
+}
+{
+ "id": {
+ "int": 3
+ }
+}
+
diff --git
a/gobblin-modules/gobblin-orc/src/test/resources/union_test/schema.avsc
b/gobblin-modules/gobblin-orc/src/test/resources/union_test/schema.avsc
new file mode 100644
index 0000000..fe0cdb1
--- /dev/null
+++ b/gobblin-modules/gobblin-orc/src/test/resources/union_test/schema.avsc
@@ -0,0 +1,15 @@
+{
+ "namespace": "com.linkedin.union",
+ "type": "record",
+ "name": "UnionTest",
+ "fields": [
+ {
+ "name": "id",
+ "type": [
+ "null",
+ "string",
+ "int"
+ ]
+ }
+ ]
+}
diff --git a/gradle/scripts/dependencyDefinitions.gradle
b/gradle/scripts/dependencyDefinitions.gradle
index 7c8931f..2ba9eb2 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -70,6 +70,7 @@ ext.externalDependency = [
"hiveMetastore": "org.apache.hive:hive-metastore:" + hiveVersion,
"hiveExec": "org.apache.hive:hive-exec:" + hiveVersion + ":core",
"hiveSerDe": "org.apache.hive:hive-serde:" + hiveVersion,
+ "hiveStorageApi": "org.apache.hive:hive-storage-api:2.4.0",
"httpclient": "org.apache.httpcomponents:httpclient:4.5.2",
"httpmime": "org.apache.httpcomponents:httpmime:4.5.2",
"httpcore": "org.apache.httpcomponents:httpcore:4.4.11",
@@ -171,6 +172,7 @@ ext.externalDependency = [
"grok": "io.thekraken:grok:0.1.5",
"hadoopAdl" : "org.apache.hadoop:hadoop-azure-datalake:3.0.0-alpha2",
"orcMapreduce":"org.apache.orc:orc-mapreduce:1.6.3",
+ "orcCore": "org.apache.orc:orc-core:1.6.3",
"orcTools":"org.apache.orc:orc-tools:1.6.3",
'parquet': 'org.apache.parquet:parquet-hadoop:1.10.1',
'parquetAvro': 'org.apache.parquet:parquet-avro:1.10.1',