This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 441d732 [GOBBLIN-695] Adding utility functions to generate Avro/ORC
binary using json
441d732 is described below
commit 441d732d272f92da7a1dedd81813f1945ff72b04
Author: Lei Sun <[email protected]>
AuthorDate: Thu Mar 7 13:21:59 2019 -0800
[GOBBLIN-695] Adding utility functions to generate Avro/ORC binary using
json
Closes #2566 from
autumnust/testToolForBinaryCreation
---
gobblin-binary-management/build.gradle | 54 +++
.../gobblin/binary_creation/AvroTestTools.java | 349 ++++++++++++++++++
.../gobblin/binary_creation/DataTestTools.java | 115 ++++++
.../gobblin/binary_creation/OrcTestTools.java | 407 +++++++++++++++++++++
.../gobblin/binary_creation/AvroTestToolsTest.java | 117 ++++++
.../gobblin/binary_creation/OrcTestToolsTest.java | 93 +++++
.../src/test/resources/avroWriterTest/data1.json | 2 +
.../src/test/resources/avroWriterTest/data2.json | 1 +
.../src/test/resources/avroWriterTest/schema.avsc | 25 ++
.../src/test/resources/orcWriterTest/data1.json | 1 +
.../src/test/resources/orcWriterTest/data2.json | 1 +
.../src/test/resources/orcWriterTest/schema.avsc | 15 +
gradle/scripts/dependencyDefinitions.gradle | 1 +
settings.gradle | 3 +-
14 files changed, 1183 insertions(+), 1 deletion(-)
diff --git a/gobblin-binary-management/build.gradle
b/gobblin-binary-management/build.gradle
new file mode 100644
index 0000000..be482cd
--- /dev/null
+++ b/gobblin-binary-management/build.gradle
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: 'java'
+
+dependencies {
+ compile project(":gobblin-api")
+ compile project(":gobblin-utility")
+
+ compile externalDependency.avro
+ compile externalDependency.avroMapredH2
+ compile externalDependency.guava
+ compile externalDependency.hadoopHdfs
+ runtime externalDependency.hadoopCommon
+ runtime externalDependency.hadoopClientCore
+ runtime externalDependency.hadoopAuth
+ compile externalDependency.hiveMetastore
+ compile externalDependency.hiveExec
+ compile externalDependency.lombok
+ compile externalDependency.orcMapreduce
+ compile externalDependency.slf4j
+
+ testCompile externalDependency.hamcrest
+ testCompile externalDependency.testng
+ testCompile externalDependency.mockito
+ testCompile externalDependency.assertj
+}
+
+configurations {
+ compile {
+ transitive = true
+ }
+ archives
+}
+
+test {
+ workingDir rootProject.rootDir
+}
+
+ext.classification="library"
diff --git
a/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/AvroTestTools.java
b/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/AvroTestTools.java
new file mode 100644
index 0000000..9a7bb1d
--- /dev/null
+++
b/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/AvroTestTools.java
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.binary_creation;
+
+import com.google.common.collect.AbstractIterator;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeMap;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.experimental.Delegate;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.mapred.FsInput;
+import org.apache.gobblin.util.FileListUtils;
+import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.filters.HiddenFilter;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.reflections.Reflections;
+import org.reflections.scanners.ResourcesScanner;
+import org.reflections.util.ConfigurationBuilder;
+
+
+/**
+ * A implementation of {@link DataTestTools} for Avro format.
+ */
+@Slf4j
+public class AvroTestTools extends DataTestTools<AvroTestTools.RecordIterator,
Schema> {
+
+ public boolean checkSameFilesAndRecords(TreeMap<String, RecordIterator>
expected,
+ TreeMap<String, RecordIterator> observed, boolean allowDifferentOrder,
Collection<String> blacklistRecordFields,
+ boolean allowDifferentSchema) {
+ Iterator<String> keys1 = expected.navigableKeySet().iterator();
+ Iterator<String> keys2 = observed.navigableKeySet().iterator();
+
+ return compareIterators(keys1, keys2, (key1, key2) -> {
+ if (!removeExtension(key1).equals(removeExtension(key2))) {
+ log.error(String.format("Mismatched files: %s and %s", key1, key2));
+ return false;
+ }
+
+ RecordIterator it1 = expected.get(key1);
+ RecordIterator it2 = observed.get(key2);
+
+ if (!allowDifferentSchema && !it1.getSchema().equals(it2.getSchema())) {
+ log.error(String.format("Mismatched schemas: %s and %s", key1, key2));
+ return false;
+ }
+
+ if (allowDifferentOrder) {
+ Set r1 = allowDifferentSchema
+ ? toSetWithBlacklistedFields(it1, blacklistRecordFields,
GenericRecordWrapper::new)
+ : toSetWithBlacklistedFields(it1, blacklistRecordFields,
Function.identity());
+ Set r2 = allowDifferentSchema
+ ? toSetWithBlacklistedFields(it2, blacklistRecordFields,
GenericRecordWrapper::new)
+ : toSetWithBlacklistedFields(it2, blacklistRecordFields,
Function.identity());
+ if (r1.equals(r2)) {
+ return true;
+ } else {
+ log.info("Sets of records differ.");
+ return false;
+ }
+ } else {
+ return compareIterators(it1, it2, (r1, r2) -> {
+ if (blacklistRecordFields != null) {
+ for (String blacklisted : blacklistRecordFields) {
+ r1.put(blacklisted, null);
+ r2.put(blacklisted, null);
+ }
+ }
+ return allowDifferentSchema ?
+ GenericRecordWrapper.compareGenericRecordRegardlessOfSchema(r1,
r2) : r1.equals(r2);
+ });
+ }
+ });
+ }
+
+ private static <T> Set<T> toSetWithBlacklistedFields(Iterator<GenericRecord>
it,
+ Collection<String> blacklistRecordFields, Function<GenericRecord, T>
transform) {
+ return StreamSupport.stream(Spliterators.spliteratorUnknownSize(it, 0),
false).map(r -> {
+ for (String blacklisted : blacklistRecordFields) {
+ r.put(blacklisted, null);
+ }
+ return transform.apply(r);
+ }).collect(Collectors.toSet());
+ }
+
+ /**
+ * Read all avro records in an HDFS location into a map from file name to
{@link RecordIterator}.
+ */
+ @Override
+ public TreeMap<String, RecordIterator>
readAllRecordsInBinaryDirectory(FileSystem fs, Path path)
+ throws IOException {
+ TreeMap<String, RecordIterator> output = new TreeMap<>();
+ if (!fs.exists(path)) {
+ return output;
+ }
+ PathFilter pathFilter = new HiddenFilter();
+ for (FileStatus status : FileListUtils.listFilesRecursively(fs, path,
pathFilter)) {
+ SeekableInput sin = new FsInput(status.getPath(), fs);
+ DataFileReader<GenericRecord> dfr = new DataFileReader<>(sin, new
GenericDatumReader<>());
+
+ String key = PathUtils.relativizePath(status.getPath(), path).toString();
+
+ output.put(key, new RecordIterator(dfr.getSchema(), new
AbstractIterator<GenericRecord>() {
+ @Override
+ protected GenericRecord computeNext() {
+ if (dfr.hasNext()) {
+ return dfr.next();
+ } else {
+ try {
+ dfr.close();
+ } catch (IOException ioe) {
+ log.error("Failed to close data file reader.", ioe);
+ }
+ endOfData();
+ return null;
+ }
+ }
+ }));
+ }
+ return output;
+ }
+
+ /**
+ * Read all avro records in a json base resource in classpath into a map
from file name to {@link RecordIterator}.
+ */
+ @Override
+ public TreeMap<String, RecordIterator> readAllRecordsInJsonResource(String
baseResource, @Nullable Schema schema)
+ throws IOException {
+ if (schema == null) {
+ String schemaResource = new File(baseResource, "schema.avsc").toString();
+ schema = readAvscSchema(schemaResource, AvroTestTools.class);
+ }
+
+ TreeMap<String, RecordIterator> output = new TreeMap<>();
+ for (String file : getJsonFileSetByResourceRootName(baseResource)) {
+ log.info("Reading json record from " + file);
+ String name = PathUtils.relativizePath(new Path(file), new
Path(baseResource)).toString();
+
+ String schemaResourceName = new File(new File(file).getParent(),
"schema.avsc").toString();
+ Schema thisSchema = readAvscSchema(schemaResourceName,
AvroTestTools.class);
+ Schema actualSchema = thisSchema == null ? schema : thisSchema;
+
+ try (InputStream is =
AvroTestTools.class.getClassLoader().getResourceAsStream(file)) {
+ output.put(name,
+ readRecordsFromJsonInputStream(actualSchema, is,
DecoderFactory.get().jsonDecoder(actualSchema, is)));
+ }
+ }
+ return output;
+ }
+
+ private static RecordIterator readRecordsFromJsonInputStream(Schema schema,
InputStream is, Decoder decoder) {
+ GenericDatumReader<GenericRecord> reader = new
GenericDatumReader<>(schema);
+
+ return new RecordIterator(schema, new AbstractIterator<GenericRecord>() {
+ @Override
+ protected GenericRecord computeNext() {
+ try {
+ return reader.read(null, decoder);
+ } catch (IOException ioe) {
+ try {
+ is.close();
+ } catch (IOException exc) {
+ log.warn("Failed to close input stream.", exc);
+ }
+ endOfData();
+ return null;
+ }
+ }
+ });
+ }
+
+ /**
+ * Materialize records in a classpath package into HDFS avro records.
+ * @param baseResource name of the package. The package should contain the
following:
+ * - Exactly one resource called <name>.avsc containing
the schema of the records
+ * (or an explicit schema passed as an argument).
+ * - One or more data files called *.json containing the
records.
+ * @param fs the {@link FileSystem} where the records will be written.
+ * @param targetPath the path where the records will be written.
+ * @param schema Schema of the records, or null to read automatically from a
resource.
+ * @throws IOException
+ */
+
+ public Schema writeJsonResourceRecordsAsBinary(String baseResource,
FileSystem fs, Path targetPath,
+ @Nullable Schema schema) throws IOException {
+ TreeMap<String, RecordIterator> recordMap =
readAllRecordsInJsonResource(baseResource, schema);
+
+ Schema outputSchema = recordMap.lastEntry().getValue().getSchema();
+
+ for (Map.Entry<String, RecordIterator> entry : recordMap.entrySet()) {
+ writeAsAvroBinary(entry.getValue(), entry.getValue().getSchema(), fs,
new Path(targetPath,
+ removeExtension(entry.getKey()) + ".avro"));
+ }
+
+ return outputSchema;
+ }
+
+ /**
+ * Read schema from an avsc resource file.
+ */
+ public static Schema readAvscSchema(String resource, Class loadedClass)
throws IOException {
+ try (InputStream is =
loadedClass.getClassLoader().getResourceAsStream(resource)) {
+ return is != null ? new Schema.Parser().parse(is) : null;
+ }
+ }
+
+ private void writeAsAvroBinary(Iterator<GenericRecord> input, Schema schema,
FileSystem fs,
+ Path outputPath) throws IOException {
+
+ DataFileWriter writer = new DataFileWriter(new GenericDatumWriter());
+
+ writer.create(schema, fs.create(outputPath, true));
+ while (input.hasNext()) {
+ writer.append(input.next());
+ }
+ writer.close();
+
+ log.info("Successfully wrote avro file to path " + outputPath);
+ }
+
+ /**
+ * An iterator over {@link GenericRecord} which is also aware of schema.
+ */
+ @AllArgsConstructor
+ public static class RecordIterator implements Iterator<GenericRecord> {
+
+ @Getter
+ private final Schema schema;
+ @Delegate
+ private final Iterator<GenericRecord> it;
+ }
+
+ /**
+ * A wrapper of {@link GenericRecord} when schema of record is not important
in comparison.
+ */
+ @AllArgsConstructor
+ public static class GenericRecordWrapper {
+ public GenericRecord record;
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ GenericRecordWrapper that = (GenericRecordWrapper) o;
+ return compareGenericRecordRegardlessOfSchema(record, that.record);
+ }
+
+ @Override
+ public int hashCode() {
+ // Getting value object array
+ int indexLen = record.getSchema().getFields().size();
+ Object[] objArr = new Object[indexLen];
+ for (int i = 0; i < indexLen; i++) {
+ objArr[i] = record.get(i);
+ }
+ return Objects.hash(objArr);
+ }
+
+ /**
+ * Compare two {@link GenericRecord} instance without considering their
schema.
+ * Useful when we want to compare two records by discarding some of fields
like header.
+ */
+ static boolean compareGenericRecordRegardlessOfSchema(GenericRecord r1,
GenericRecord r2) {
+ List<Schema.Field> listOfFields1 = r1.getSchema().getFields();
+ List<Schema.Field> listOfFields2 = r2.getSchema().getFields();
+
+ if (listOfFields1.size() != listOfFields2.size()) {
+ return false;
+ }
+
+ boolean result = true;
+ for (int i = 0; i < listOfFields1.size(); i++) {
+ result = result && (
+ ((r1.get(i) == null && r2.get(i) == null)
+ ||
(listOfFields1.get(i).name().equals(listOfFields2.get(i).name())
+ && (r1.get(i).equals(r2.get(i)))))
+ );
+ }
+
+ return result;
+ }
+ }
+
+ // Package-private methods shared by different format's tool-kit.
+ static String removeExtension(String string) {
+ if (string.endsWith(".avro") || string.endsWith(".json")) {
+ return string.substring(0, string.length() - 5);
+ }
+ throw new IllegalArgumentException("Only support avro and json
extensions.");
+ }
+
+ static Set<String> getJsonFileSetByResourceRootName(String baseResource) {
+ Reflections reflections = new Reflections(new ConfigurationBuilder()
+ .forPackages(baseResource)
+ .filterInputsBy(name -> name.startsWith(baseResource))
+ .setScanners(new ResourcesScanner()));
+
+ return reflections.getResources(url -> url.endsWith(".json"));
+ }
+
+ public static boolean isResourceExisted(String resource) throws IOException {
+ return AvroTestTools.class.getClassLoader().getResource(resource) != null;
+ }
+}
diff --git
a/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/DataTestTools.java
b/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/DataTestTools.java
new file mode 100644
index 0000000..45c2057
--- /dev/null
+++
b/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/DataTestTools.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.binary_creation;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.TreeMap;
+import java.util.function.BiFunction;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+/**
+ * A ToolKit that will be used for:
+ * - Creating binary-format file(Avro, ORC) using records declared in
txt(.json) file, and schema defined in .avsc file.
+ * - Deserializing binary-format file into traversable in-memory objects.
+ * - Verifying if contents in two binary-format file are identical with
certain constraints.
+ *
+ *
+ * @param <T> Iterator containing specific type of a record row,
+ * e.g. {@link org.apache.avro.generic.GenericRecord} for Avro.
+ * @param <S> Schema type of a specific data format.
+ */
+@Slf4j
+public abstract class DataTestTools<T, S> {
+ /**
+ * Verify that the two inputs contain the same records in the same file
names. Any fields listed in
+ * blacklistRecordFields will not be used for comparison.
+ * Note that this method is destructive to the input records.
+ * @param expected Expected records map, keyed by file name.
+ * @param observed Observed records map, keyed by file name
+ * @param allowDifferentOrder True if allowing fields arranged in different
order in comparison of two records.
+ * @param blacklistRecordFields Configurable set of fields that won't be
included for comparison of two records.
+ * @param allowDifferentSchema True if schema info (for avro, schema can
contain attributes which is not necessary
+ * to be included for comparison)
+ * @return
+ */
+ public abstract boolean checkSameFilesAndRecords(TreeMap<String, T>
expected, TreeMap<String, T> observed,
+ boolean allowDifferentOrder, Collection<String> blacklistRecordFields,
boolean allowDifferentSchema);
+
+ /**
+ * Write a resource file under a certain path as specified binary format
file, like Avro, ORC.
+ * @param baseResource Resource folder that contain JSON files.
+ * @param fs
+ * @param targetPath Output Path.
+ * @param schema The schema of outputed binary file
+ * @return
+ * @throws IOException
+ */
+ public abstract S writeJsonResourceRecordsAsBinary(String baseResource,
FileSystem fs, Path targetPath, S schema)
+ throws IOException;
+
+ /**
+ * Read all records in a json base resource in classpath into a map from
file name to iterator of T object.
+ * @param baseResource Base path of the resource directory that contains
json file.
+ * @param schema The schema of records.
+ * @return A map between file name to an iterator of objects contained in
path.
+ */
+ public abstract TreeMap<String, T> readAllRecordsInJsonResource(String
baseResource, S schema) throws IOException;
+
+ /**
+ * Read binary-format records into a map from file name to an iterator of T
object.
+ * @param fs File system object.
+ * @param path File path
+ * @return A map between file name to an iterator of objects contained in
path.
+ * @throws IOException
+ */
+ public abstract TreeMap<String, T>
readAllRecordsInBinaryDirectory(FileSystem fs, Path path) throws IOException;
+
+ /**
+ * Compare two iterators in T type.
+ */
+ <T> boolean compareIterators(Iterator<T> expected, Iterator<T> observed,
BiFunction<T, T, Boolean> comparator) {
+ while (expected.hasNext()) {
+ if (!observed.hasNext()) {
+ log.error("Expected has more elements than observed.");
+ return false;
+ }
+
+ T t1 = expected.next();
+ T t2 = observed.next();
+
+ boolean equals = comparator == null ? t1.equals(t2) :
comparator.apply(t1, t2);
+
+ if (!equals) {
+ log.error(String.format("Mismatch: %s does not equal %s.", t1, t2));
+ return false;
+ }
+ }
+
+ if (observed.hasNext()) {
+ log.error("Observed has more elements than expected.");
+ return false;
+ }
+
+ return true;
+ }
+}
diff --git
a/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/OrcTestTools.java
b/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/OrcTestTools.java
new file mode 100644
index 0000000..07c46ab
--- /dev/null
+++
b/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/OrcTestTools.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.binary_creation;
+
+import com.google.common.collect.AbstractIterator;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TreeMap;
+import javax.annotation.Nullable;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.experimental.Delegate;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.gobblin.util.FileListUtils;
+import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.filters.HiddenFilter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
+import org.apache.hadoop.hive.ql.io.orc.Reader;
+import org.apache.hadoop.hive.ql.io.orc.RecordReader;
+import org.apache.hadoop.hive.ql.io.orc.Writer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
+import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
+import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+import static org.apache.gobblin.binary_creation.AvroTestTools.*;
+
+
+// A class that examines ORC-Format file in Purger Integration test.
+@Slf4j
+public class OrcTestTools extends DataTestTools<OrcTestTools.OrcRowIterator,
TypeInfo> {
+
+ /**
+ *
+ * @param expected
+ * @param observed
+ * @param allowDifferentOrder ORC tools will not use this parameter
currently.
+ * @param blacklistRecordFields ORC tools will not use this parameter
currently.
+ * @return If two sets of files are identical.
+ * Note that there might be an ordering issue in this comparison method.
When one is drafting an ORC integration
+ * test, try to name all json files differently.
+ */
+ @Override
+ public boolean checkSameFilesAndRecords(TreeMap<String, OrcRowIterator>
expected,
+ TreeMap<String, OrcRowIterator> observed, boolean allowDifferentOrder,
Collection<String> blacklistRecordFields,
+ boolean allowDifferentSchema) {
+ Iterator<String> keys1 = expected.navigableKeySet().iterator();
+ Iterator<String> keys2 = observed.navigableKeySet().iterator();
+
+ return compareIterators(keys1, keys2, (key1, key2) -> {
+ // ORC file doesn't have extension by Linkedin's convention.
+ if (!removeExtension(key1).equals(key2)) {
+ log.error(String.format("Mismatched files: %s and %s", key1, key2));
+ return false;
+ }
+
+ OrcRowIterator it1 = expected.get(key1);
+ OrcRowIterator it2 = observed.get(key2);
+
+ if (!it1.getTypeInfo().equals(it2.getTypeInfo())) {
+ log.error(String.format("Mismatched Typeinfo: %s and %s", key1, key2));
+ return false;
+ }
+
+ boolean result = true;
+ while (it1.hasNext()) {
+ if (!it2.hasNext() || !result) {
+ return false;
+ }
+ result = compareJavaRowAndOrcStruct(((AvroRow) it1.next()).getRow(),
(OrcStruct) it2.next());
+ }
+ return result;
+ });
+ }
+
+ /**
+ * Given the fact that we couldn't access OrcStruct easily, here uses the
hacky way(reflection)
+ * to go around access modifier for integration test purpose only.
+ * @param realRow A row containing a list of Java objects.
+ * @param struct An {@link OrcStruct} which essentially is a list of {@link
Writable} objects.
+ */
+ private boolean compareJavaRowAndOrcStruct(Object realRow, OrcStruct struct)
{
+ boolean isIdentical = true;
+ ArrayList<Object> javaObjRow = (ArrayList) realRow;
+
+ try {
+ Field objectArr = OrcStruct.class.getDeclaredField("fields");
+ objectArr.setAccessible(true);
+ Object[] dataArr = (Object[]) objectArr.get(struct);
+
+ int index = 0;
+ for (Object dataField : dataArr) {
+ if (dataField instanceof OrcStruct) {
+ isIdentical = isIdentical &&
compareJavaRowAndOrcStruct(javaObjRow.get(index), (OrcStruct) dataField);
+ } else {
+ isIdentical = isIdentical && objCastHelper(javaObjRow.get(index),
(Writable) dataField);
+ }
+ index++;
+ }
+ } catch (NoSuchFieldException | IllegalAccessException nfe) {
+ throw new RuntimeException("Failed in compare a java object row and
orcstruct");
+ }
+
+ return isIdentical;
+ }
+
+ /**
+ * All Writable objects passed in here are guaranteed to be primitive
writable objects.
+ */
+ private boolean objCastHelper(Object javaObj, Writable obj) {
+ if (obj instanceof IntWritable) {
+ return ((IntWritable) obj).get() == (Integer) javaObj;
+ } else if (obj instanceof Text) {
+ return (obj).toString().equals(javaObj);
+ } else if (obj instanceof LongWritable) {
+ return ((LongWritable) obj).get() == (Long) javaObj;
+ } else if (obj instanceof ShortWritable) {
+ return ((ShortWritable) obj).get() == (Short) javaObj;
+ } else if (obj instanceof DoubleWritable) {
+ return ((DoubleWritable) obj).get() == (Double) javaObj;
+ } else {
+ throw new RuntimeException("Cannot recognize the writable type, please
enrich the castHelper function");
+ }
+ }
+
+ /**
+ * Materialize records in a classpath package into HDFS ORC records.
+ * @param baseResource name of the package. The package should contain the
following:
+ * - Exactly one resource called orcSchema containing
the schema of the records
+ * (or an explicit schema passed as an argument).
+ * - One or more data files called *.json containing the
records.
+ * Note that .avsc will not be used in Orc related
operation.
+ *
+ * @param fs
+ * @param targetPath the path where the records will be written.
+ * @param schema
+ * @return
+ */
+ @Override
+ public TypeInfo writeJsonResourceRecordsAsBinary(String baseResource,
@Nullable FileSystem fs, Path targetPath,
+ @Nullable TypeInfo schema) throws IOException {
+ TreeMap<String, OrcRowIterator> recordMap =
readAllRecordsInJsonResource(baseResource, null);
+
+ TypeInfo outputSchema = recordMap.lastEntry().getValue().getTypeInfo();
+ for (Map.Entry<String, OrcRowIterator> entry : recordMap.entrySet()) {
+ writeAsOrcBinary(entry.getValue(), outputSchema, new Path(targetPath,
removeExtension(entry.getKey())));
+ }
+
+ return outputSchema;
+ }
+
+ /**
+ * AvroRow version of writeAsOrcBinary
+ */
+ private void writeAsOrcBinary(OrcRowIterator input, TypeInfo schema, Path
outputPath) throws IOException {
+ Configuration configuration = new Configuration();
+
+ // Note that it doesn't support schema evolution at all.
+ // If the schema in realRow is inconsistent with given schema, writing
into disk
+ // would run into failure.
+ ObjectInspector oi =
TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(schema);
+ OrcFile.WriterOptions options =
OrcFile.writerOptions(configuration).inspector(oi);
+ Writer writer = null;
+
+ while (input.hasNext()) {
+ AvroRow avroRow = (AvroRow) input.next();
+ if (writer == null) {
+ options.inspector(avroRow.getInspector());
+ writer = OrcFile.createWriter(outputPath, options);
+ }
+ writer.addRow(avroRow.realRow);
+ }
+ if (writer != null) {
+ writer.close();
+ }
+ }
+
+ // ORC-File Reading related functions
+
+ // There's no GenericRecord for ORC existed(so that OrcStruct even doesn't
provide readFields as
+ // it is responsible to transform a Writable into GenericRecord in Avro
world.
+ @Override
+ public TreeMap<String, OrcRowIterator> readAllRecordsInJsonResource(String
baseResource,
+ @Nullable TypeInfo schema) throws IOException {
+ TypeInfo orcSchema;
+ try {
+ File schemaFile = new File(baseResource, "schema.avsc");
+ String schemaResource = schemaFile.toString();
+ orcSchema = convertAvroSchemaToOrcSchema(readAvscSchema(schemaResource,
OrcTestTools.class));
+ } catch (SerDeException se) {
+ throw new RuntimeException("Provided Avro Schema cannot be transformed
to ORC schema", se);
+ }
+
+ TreeMap<String, OrcRowIterator> output = new TreeMap<>();
+ for (String file : getJsonFileSetByResourceRootName(baseResource)) {
+ log.info("Reading json record from " + file);
+ String name = PathUtils.relativizePath(new Path(file), new
Path(baseResource)).toString();
+ output.put(name, readRecordsFromJsonInputStream(orcSchema, file));
+ }
+
+ return output;
+ }
+
+ public static class AvroRow implements Writable {
+ Object realRow;
+ ObjectInspector inspector;
+
+ public AvroRow(Object row, ObjectInspector inspector) {
+ this.realRow = row;
+ this.inspector = inspector;
+ }
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ throw new UnsupportedOperationException("can't write the bundle");
+ }
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ throw new UnsupportedOperationException("can't read the bundle");
+ }
+
+ ObjectInspector getInspector() {
+ return inspector;
+ }
+
+ Object getRow() {
+ return realRow;
+ }
+ }
+
+ /**
+ * Deserialize json object into a list of java object as a row, and
transform each of java object
+ * into {@link Writable} counterpart for constructing {@link OrcStruct}, in
convenience of Orc reading and writing.
+ *
+ * @param typeInfo The ORC schema in {@link TypeInfo} format.
+ * @param file The file name in String format.
+ * @return
+ */
+ private OrcRowIterator readRecordsFromJsonInputStream(TypeInfo typeInfo,
String file) throws IOException {
+
+ InputStream is =
OrcTestTools.class.getClassLoader().getResourceAsStream(file);
+
+
+ // This getParent.getParent is dirty due to we need to simulate
multiple-partitions scenarios in iTest.
+ String schemaResourceName = new File(new
File(file).getParentFile().getParent(), "schema.avsc").toString();
+
+ Schema attemptedSchema = readAvscSchema(schemaResourceName,
OrcTestTools.class);
+ final Schema avroSchema =
+ attemptedSchema == null ? readAvscSchema(new File(new
File(file).getParent(), "schema.avsc").toString(),
+ OrcTestTools.class) : attemptedSchema;
+
+ GenericDatumReader<GenericRecord> reader = new
GenericDatumReader<>(avroSchema);
+ Decoder decoder = DecoderFactory.get().jsonDecoder(avroSchema, is);
+
+ return new OrcRowIterator(typeInfo, new AbstractIterator<Writable>() {
+ @Override
+ protected Writable computeNext() {
+ try {
+ GenericRecord record = reader.read(null, decoder);
+ return getAvroWritable(record, avroSchema);
+ } catch (IOException e) {
+ try {
+ is.close();
+ } catch (IOException ioec) {
+ log.warn("Failed to read record from inputstream, will close it
immediately", ioec);
+ }
+ endOfData();
+ return null;
+ }
+ }
+ });
+ }
+
+ /**
+ * From each record, transformed to {@link AvroRow} object for writing.
+ * One can also choose to use OrcSerDe to obtain ORC-associated writable
object.
+ *
+ * Using return object of this method would enable a self-maintained ORC
writer(not from OrcOutputFormat)
+ * to write object.
+ */
+ private Writable getAvroWritable(GenericRecord record, Schema avroSchema) {
+ try {
+ // Construct AvroSerDe with proper schema and deserialize into Hive
object.
+ AvroSerDe serDe = new AvroSerDe();
+ Properties propertiesWithSchema = new Properties();
+
propertiesWithSchema.setProperty(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(),
+ avroSchema.toString());
+ serDe.initialize(null, propertiesWithSchema);
+ AvroGenericRecordWritable avroGenericRecordWritable = new
AvroGenericRecordWritable(record);
+ avroGenericRecordWritable.setFileSchema(avroSchema);
+ Object avroDeserialized = serDe.deserialize(avroGenericRecordWritable);
+ ObjectInspector avroOI = new
AvroObjectInspectorGenerator(avroSchema).getObjectInspector();
+
+ return new AvroRow(avroDeserialized, avroOI);
+ } catch (SerDeException se) {
+ throw new RuntimeException("Failed in SerDe exception:", se);
+ }
+ }
+
+ /**
+ * Reading ORC file into in-memory representation.
+ */
+ @Override
+ public TreeMap<String, OrcRowIterator>
readAllRecordsInBinaryDirectory(FileSystem fs, Path path) throws IOException {
+ TreeMap<String, OrcRowIterator> output = new TreeMap<>();
+ if (!fs.exists(path)) {
+ return output;
+ }
+ PathFilter pathFilter = new HiddenFilter();
+ for (FileStatus status : FileListUtils.listFilesRecursively(fs, path,
pathFilter)) {
+ String key = PathUtils.relativizePath(status.getPath(), path).toString();
+ Reader orcReader = OrcFile.createReader(fs, status.getPath());
+ RecordReader recordReader = orcReader.rows();
+
+ output.put(key, new
OrcRowIterator(TypeInfoUtils.getTypeInfoFromObjectInspector(orcReader.getObjectInspector()),
+ new AbstractIterator<Writable>() {
+ @Override
+ protected Writable computeNext() {
+ try {
+ if (recordReader.hasNext()) {
+ return (Writable) recordReader.next(null);
+ } else {
+ recordReader.close();
+ endOfData();
+ return null;
+ }
+ } catch (IOException ioe) {
+ log.warn("Failed to process orc record reader, will terminate
reader immediately", ioe);
+ endOfData();
+ return null;
+ }
+ }
+ }));
+ }
+
+ return output;
+ }
+
+ /**
+ * An iterator over {@link GenericRecord} which is also aware of schema.
+ */
+ @AllArgsConstructor
+ public static class OrcRowIterator implements Iterator<Writable> {
+
+ @Getter
+ private final TypeInfo typeInfo;
+ @Delegate
+ private final Iterator<Writable> it;
+ }
+
+ /**
+ * Convert Avro schema into TypeInfo.
+ * Current version of Hive used by Gobblin open-source(1.0.1) doesn't have
{@link org.apache.orc.TypeDescription}
+ * and utilities associated with it. So instead {@link TypeInfo} is being
used to represent Orc schema.
+ * Note that {@link TypeInfo} is not case preserving as it is actually the
internal schema representation of Hive.
+ */
+ public static TypeInfo convertAvroSchemaToOrcSchema(Schema avroSchema)
throws SerDeException {
+ return TypeInfoUtils.getTypeInfoFromObjectInspector(
+ new AvroObjectInspectorGenerator(avroSchema).getObjectInspector());
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-binary-management/src/test/java/org/apache/gobblin/binary_creation/AvroTestToolsTest.java
b/gobblin-binary-management/src/test/java/org/apache/gobblin/binary_creation/AvroTestToolsTest.java
new file mode 100644
index 0000000..22004c0
--- /dev/null
+++
b/gobblin-binary-management/src/test/java/org/apache/gobblin/binary_creation/AvroTestToolsTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.binary_creation;
+
+import com.google.common.io.Files;
+import java.io.File;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import org.apache.gobblin.binary_creation.AvroTestTools.*;
+
+
+public class AvroTestToolsTest {
+
+ @Test
+ public void test() throws Exception {
+ DataTestTools testTools = new AvroTestTools();
+
+ String resourceName = "avroWriterTest";
+
+ File tmpDir = Files.createTempDir();
+
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ Path output = new Path(tmpDir.getAbsolutePath(), "test");
+
+ testTools.writeJsonResourceRecordsAsBinary(resourceName, fs, output, null);
+
+
Assert.assertTrue(testTools.checkSameFilesAndRecords(testTools.readAllRecordsInJsonResource(resourceName,
null),
+ testTools.readAllRecordsInBinaryDirectory(fs, output), false, null,
true));
+ }
+
+ @Test
+ public void testGenericRecordDataComparisonWithoutSchema() throws Exception {
+ Schema avroSchema = (new Schema.Parser()).parse(
+ "{\n" + " \"namespace\": \"com.linkedin.compliance.test\",\n" + "
\"type\": \"record\",\n"
+ + " \"name\": \"SimpleTest\",\n" + " \"fields\": [\n" + "
{\n" + " \"name\": \"memberId\",\n"
+ + " \"type\": \"int\"\n" + " },\n" + " {\n" + "
\"name\": \"name\",\n"
+ + " \"type\": \"string\"\n" + " }\n" + " ]\n" + "}");
+
+ Schema avroSchemaDiffInNamespace = (new Schema.Parser()).parse(
+ "{\n" + " \"namespace\": \"com.linkedin.whatever\",\n" + " \"type\":
\"record\",\n"
+ + " \"name\": \"SimpleTest\",\n" + " \"fields\": [\n" + "
{\n" + " \"name\": \"memberId\",\n"
+ + " \"type\": \"int\"\n" + " },\n" + " {\n" + "
\"name\": \"name\",\n"
+ + " \"type\": \"string\"\n" + " }\n" + " ]\n" + "}");
+
+ Schema nullableSchema = (new Schema.Parser()).parse(
+ "{\n" + " \"namespace\": \"com.linkedin.compliance.test\",\n" + "
\"type\": \"record\",\n"
+ + " \"name\": \"SimpleTest\",\n" + " \"fields\": [\n" + "
{\n" + " \"name\": \"memberId\",\n"
+ + " \"type\": [\n" + " \"null\",\n" + "
\"int\",\n" + " \"string\"\n"
+ + " ]\n" + " },\n" + " {\n" + " \"name\":
\"name\",\n" + " \"type\": \"string\"\n"
+ + " }\n" + " ]\n" + "}");
+
+ GenericRecordBuilder builder_0 = new GenericRecordBuilder(avroSchema);
+ builder_0.set("memberId", "1");
+ builder_0.set("name", "alice");
+ GenericData.Record record_0 = builder_0.build();
+
+ GenericRecordBuilder builder_1 = new
GenericRecordBuilder(avroSchemaDiffInNamespace);
+ builder_1.set("memberId", "1");
+ builder_1.set("name", "alice");
+ GenericData.Record record_1 = builder_1.build();
+
+ GenericRecordBuilder builder_2 = new
GenericRecordBuilder(avroSchemaDiffInNamespace);
+ builder_2.set("memberId", "1");
+ builder_2.set("name", "alice");
+ GenericData.Record record_2 = builder_2.build();
+
+ GenericRecordBuilder builder_3 = new
GenericRecordBuilder(avroSchemaDiffInNamespace);
+ builder_3.set("memberId", "2");
+ builder_3.set("name", "bob");
+ GenericData.Record record_3 = builder_3.build();
+
+ GenericRecordBuilder builder_4 = new GenericRecordBuilder(nullableSchema);
+ builder_4.set("memberId", null);
+ builder_4.set("name", "bob");
+ GenericData.Record record_4 = builder_4.build();
+
+ GenericRecordBuilder builder_5 = new GenericRecordBuilder(nullableSchema);
+ builder_5.set("memberId", null);
+ builder_5.set("name", "bob");
+ GenericData.Record record_5 = builder_5.build();
+
+ Assert.assertTrue(!record_0.equals(record_1));
+
+ AvroTestTools.GenericRecordWrapper wrapper_0 = new
GenericRecordWrapper(record_0);
+ GenericRecordWrapper wrapper_1 = new GenericRecordWrapper(record_1);
+ GenericRecordWrapper wrapper_2 = new GenericRecordWrapper(record_2);
+ GenericRecordWrapper wrapper_3 = new GenericRecordWrapper(record_3);
+ GenericRecordWrapper wrapper_4 = new GenericRecordWrapper(record_4);
+ GenericRecordWrapper wrapper_5 = new GenericRecordWrapper(record_5);
+
+ Assert.assertEquals(wrapper_0, wrapper_1);
+ Assert.assertEquals(wrapper_1, wrapper_2);
+ Assert.assertNotSame(wrapper_2, wrapper_3);
+ Assert.assertEquals(wrapper_4, wrapper_5);
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-binary-management/src/test/java/org/apache/gobblin/binary_creation/OrcTestToolsTest.java
b/gobblin-binary-management/src/test/java/org/apache/gobblin/binary_creation/OrcTestToolsTest.java
new file mode 100644
index 0000000..49a0e79
--- /dev/null
+++
b/gobblin-binary-management/src/test/java/org/apache/gobblin/binary_creation/OrcTestToolsTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.binary_creation;
+
+import com.google.common.io.Files;
+import java.io.File;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.orc.TypeDescription;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class OrcTestToolsTest {
+
+ public DataTestTools orcTools = new OrcTestTools();;
+
+ @Test
+ public void test() throws Exception {
+
+ String resourceName = "orcWriterTest";
+
+ File tmpDir = Files.createTempDir();
+
+ FileSystem fs = FileSystem.get(new Configuration());
+
+ Path output = new Path(tmpDir.getAbsolutePath(), "test");
+
+ orcTools.writeJsonResourceRecordsAsBinary(resourceName, null, output,
null);
+
+
Assert.assertTrue(orcTools.checkSameFilesAndRecords(orcTools.readAllRecordsInJsonResource(resourceName,
null),
+ orcTools.readAllRecordsInBinaryDirectory(fs, output), true, null,
false));
+ }
+
+ @Test
+ public void testSchemaToTypeInfoConversion() throws Exception {
+ // Simple non-nested case:
+ Schema avroSchema = SchemaBuilder.record("test")
+ .fields()
+ .name("id")
+ .type()
+ .intType()
+ .noDefault()
+ .name("timestamp")
+ .type()
+ .stringType()
+ .noDefault()
+ .endRecord();
+
+ TypeInfo orcSchema = OrcTestTools.convertAvroSchemaToOrcSchema(avroSchema);
+ String targetOrcSchemaString = "struct<id:int,timestamp:string>";
+ Assert.assertEquals(targetOrcSchemaString, orcSchema.toString());
+
+ // Nested case:
+ avroSchema = SchemaBuilder.record("nested")
+ .fields()
+ .name("nestedId")
+ .type()
+ .array()
+ .items()
+ .stringType()
+ .noDefault()
+ .name("timestamp")
+ .type()
+ .stringType()
+ .noDefault()
+ .endRecord();
+ orcSchema = OrcTestTools.convertAvroSchemaToOrcSchema(avroSchema);
+ TypeDescription targetTypeDescription = TypeDescription.createStruct()
+ .addField("nestedId",
TypeDescription.createList(TypeDescription.createString()))
+ .addField("timestamp", TypeDescription.createString());
+ Assert.assertEquals(orcSchema.toString().toLowerCase(),
targetTypeDescription.toString().toLowerCase());
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-binary-management/src/test/resources/avroWriterTest/data1.json
b/gobblin-binary-management/src/test/resources/avroWriterTest/data1.json
new file mode 100644
index 0000000..59be71c
--- /dev/null
+++ b/gobblin-binary-management/src/test/resources/avroWriterTest/data1.json
@@ -0,0 +1,2 @@
+{"name": "Alyssa", "favorite_number": {"int": 256}, "favorite_color": null}
+{"name": "Ben", "favorite_number": {"int": 7}, "favorite_color": {"string":
"red"}}
diff --git
a/gobblin-binary-management/src/test/resources/avroWriterTest/data2.json
b/gobblin-binary-management/src/test/resources/avroWriterTest/data2.json
new file mode 100644
index 0000000..ead75c4
--- /dev/null
+++ b/gobblin-binary-management/src/test/resources/avroWriterTest/data2.json
@@ -0,0 +1 @@
+{"name": "Charlie", "favorite_number": null, "favorite_color": {"string":
"blue"}}
diff --git
a/gobblin-binary-management/src/test/resources/avroWriterTest/schema.avsc
b/gobblin-binary-management/src/test/resources/avroWriterTest/schema.avsc
new file mode 100644
index 0000000..8d88e5e
--- /dev/null
+++ b/gobblin-binary-management/src/test/resources/avroWriterTest/schema.avsc
@@ -0,0 +1,25 @@
+{
+ "namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {
+ "name": "name",
+ "type": "string"
+ },
+ {
+ "name": "favorite_number",
+ "type": [
+ "null",
+ "int"
+ ]
+ },
+ {
+ "name": "favorite_color",
+ "type": [
+ "null",
+ "string"
+ ]
+ }
+ ]
+}
diff --git
a/gobblin-binary-management/src/test/resources/orcWriterTest/data1.json
b/gobblin-binary-management/src/test/resources/orcWriterTest/data1.json
new file mode 100644
index 0000000..17fdea9
--- /dev/null
+++ b/gobblin-binary-management/src/test/resources/orcWriterTest/data1.json
@@ -0,0 +1 @@
+{"memberId": 1, "name": "Alyssa"}
\ No newline at end of file
diff --git
a/gobblin-binary-management/src/test/resources/orcWriterTest/data2.json
b/gobblin-binary-management/src/test/resources/orcWriterTest/data2.json
new file mode 100644
index 0000000..a798165
--- /dev/null
+++ b/gobblin-binary-management/src/test/resources/orcWriterTest/data2.json
@@ -0,0 +1 @@
+{"memberId": 2, "name": "Bob"}
\ No newline at end of file
diff --git
a/gobblin-binary-management/src/test/resources/orcWriterTest/schema.avsc
b/gobblin-binary-management/src/test/resources/orcWriterTest/schema.avsc
new file mode 100644
index 0000000..f9fdf51
--- /dev/null
+++ b/gobblin-binary-management/src/test/resources/orcWriterTest/schema.avsc
@@ -0,0 +1,15 @@
+{
+ "namespace": "com.linkedin.compliance.test",
+ "type": "record",
+ "name": "SimpleTest",
+ "fields": [
+ {
+ "name": "memberId",
+ "type": "int"
+ },
+ {
+ "name": "name",
+ "type": "string"
+ }
+ ]
+}
diff --git a/gradle/scripts/dependencyDefinitions.gradle
b/gradle/scripts/dependencyDefinitions.gradle
index 0009397..f669316 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -167,6 +167,7 @@ ext.externalDependency = [
"opencsv": "com.opencsv:opencsv:3.8",
"grok": "io.thekraken:grok:0.1.5",
"hadoopAdl" : "org.apache.hadoop:hadoop-azure-datalake:3.0.0-alpha2",
+ "orcMapreduce":"org.apache.orc:orc-mapreduce:1.5.4",
'parquet': 'com.twitter:parquet-hadoop-bundle:1.5.0',
'reactivex': 'io.reactivex.rxjava2:rxjava:2.1.0',
"slf4j": [
diff --git a/settings.gradle b/settings.gradle
index 629345c..f1ceb05 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -41,7 +41,8 @@ def modules = ['gobblin-admin',
'gobblin-cluster',
'gobblin-aws',
'gobblin-service',
- 'gobblin-test-utils']
+ 'gobblin-test-utils',
+ 'gobblin-binary-management']
// Disable jacoco for now as Kafka 0.8 is the default version and jacoco does
not like the same classes
// being declared in different modules