This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 441d732  [GOBBLIN-695] Adding utility functions to generate Avro/ORC 
binary using json
441d732 is described below

commit 441d732d272f92da7a1dedd81813f1945ff72b04
Author: Lei Sun <[email protected]>
AuthorDate: Thu Mar 7 13:21:59 2019 -0800

    [GOBBLIN-695] Adding utility functions to generate Avro/ORC binary using 
json
    
    Closes #2566 from
    autumnust/testToolForBinaryCreation
---
 gobblin-binary-management/build.gradle             |  54 +++
 .../gobblin/binary_creation/AvroTestTools.java     | 349 ++++++++++++++++++
 .../gobblin/binary_creation/DataTestTools.java     | 115 ++++++
 .../gobblin/binary_creation/OrcTestTools.java      | 407 +++++++++++++++++++++
 .../gobblin/binary_creation/AvroTestToolsTest.java | 117 ++++++
 .../gobblin/binary_creation/OrcTestToolsTest.java  |  93 +++++
 .../src/test/resources/avroWriterTest/data1.json   |   2 +
 .../src/test/resources/avroWriterTest/data2.json   |   1 +
 .../src/test/resources/avroWriterTest/schema.avsc  |  25 ++
 .../src/test/resources/orcWriterTest/data1.json    |   1 +
 .../src/test/resources/orcWriterTest/data2.json    |   1 +
 .../src/test/resources/orcWriterTest/schema.avsc   |  15 +
 gradle/scripts/dependencyDefinitions.gradle        |   1 +
 settings.gradle                                    |   3 +-
 14 files changed, 1183 insertions(+), 1 deletion(-)

diff --git a/gobblin-binary-management/build.gradle 
b/gobblin-binary-management/build.gradle
new file mode 100644
index 0000000..be482cd
--- /dev/null
+++ b/gobblin-binary-management/build.gradle
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: 'java'
+
+dependencies {
+  compile project(":gobblin-api")
+  compile project(":gobblin-utility")
+
+  compile externalDependency.avro
+  compile externalDependency.avroMapredH2
+  compile externalDependency.guava
+  compile externalDependency.hadoopHdfs
+  runtime externalDependency.hadoopCommon
+  runtime externalDependency.hadoopClientCore
+  runtime externalDependency.hadoopAuth
+  compile externalDependency.hiveMetastore
+  compile externalDependency.hiveExec
+  compile externalDependency.lombok
+  compile externalDependency.orcMapreduce
+  compile externalDependency.slf4j
+
+  testCompile externalDependency.hamcrest
+  testCompile externalDependency.testng
+  testCompile externalDependency.mockito
+  testCompile externalDependency.assertj
+}
+
+configurations {
+    compile {
+        transitive = true
+    }
+    archives
+}
+
+test {
+    workingDir rootProject.rootDir
+}
+
+ext.classification="library"
diff --git 
a/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/AvroTestTools.java
 
b/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/AvroTestTools.java
new file mode 100644
index 0000000..9a7bb1d
--- /dev/null
+++ 
b/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/AvroTestTools.java
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.binary_creation;
+
+import com.google.common.collect.AbstractIterator;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeMap;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.experimental.Delegate;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.mapred.FsInput;
+import org.apache.gobblin.util.FileListUtils;
+import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.filters.HiddenFilter;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.reflections.Reflections;
+import org.reflections.scanners.ResourcesScanner;
+import org.reflections.util.ConfigurationBuilder;
+
+
+/**
+ * A implementation of {@link DataTestTools} for Avro format.
+ */
+@Slf4j
+public class AvroTestTools extends DataTestTools<AvroTestTools.RecordIterator, 
Schema> {
+
+  public boolean checkSameFilesAndRecords(TreeMap<String, RecordIterator> 
expected,
+      TreeMap<String, RecordIterator> observed, boolean allowDifferentOrder, 
Collection<String> blacklistRecordFields,
+      boolean allowDifferentSchema) {
+    Iterator<String> keys1 = expected.navigableKeySet().iterator();
+    Iterator<String> keys2 = observed.navigableKeySet().iterator();
+
+    return compareIterators(keys1, keys2, (key1, key2) -> {
+      if (!removeExtension(key1).equals(removeExtension(key2))) {
+        log.error(String.format("Mismatched files: %s and %s", key1, key2));
+        return false;
+      }
+
+      RecordIterator it1 = expected.get(key1);
+      RecordIterator it2 = observed.get(key2);
+
+      if (!allowDifferentSchema && !it1.getSchema().equals(it2.getSchema())) {
+        log.error(String.format("Mismatched schemas: %s and %s", key1, key2));
+        return false;
+      }
+
+      if (allowDifferentOrder) {
+        Set r1 = allowDifferentSchema
+            ? toSetWithBlacklistedFields(it1, blacklistRecordFields, 
GenericRecordWrapper::new)
+            : toSetWithBlacklistedFields(it1, blacklistRecordFields, 
Function.identity());
+        Set r2 = allowDifferentSchema
+            ? toSetWithBlacklistedFields(it2, blacklistRecordFields, 
GenericRecordWrapper::new)
+            : toSetWithBlacklistedFields(it2, blacklistRecordFields, 
Function.identity());
+        if (r1.equals(r2)) {
+          return true;
+        } else {
+          log.info("Sets of records differ.");
+          return false;
+        }
+      } else {
+        return compareIterators(it1, it2, (r1, r2) -> {
+          if (blacklistRecordFields != null) {
+            for (String blacklisted : blacklistRecordFields) {
+              r1.put(blacklisted, null);
+              r2.put(blacklisted, null);
+            }
+          }
+          return allowDifferentSchema ?
+              GenericRecordWrapper.compareGenericRecordRegardlessOfSchema(r1, 
r2) : r1.equals(r2);
+        });
+      }
+    });
+  }
+
+  private static <T> Set<T> toSetWithBlacklistedFields(Iterator<GenericRecord> 
it,
+      Collection<String> blacklistRecordFields, Function<GenericRecord, T> 
transform) {
+    return StreamSupport.stream(Spliterators.spliteratorUnknownSize(it, 0), 
false).map(r -> {
+      for (String blacklisted : blacklistRecordFields) {
+        r.put(blacklisted, null);
+      }
+      return transform.apply(r);
+    }).collect(Collectors.toSet());
+  }
+
+  /**
+   * Read all avro records in an HDFS location into a map from file name to 
{@link RecordIterator}.
+   */
+  @Override
+  public TreeMap<String, RecordIterator> 
readAllRecordsInBinaryDirectory(FileSystem fs, Path path)
+      throws IOException {
+    TreeMap<String, RecordIterator> output = new TreeMap<>();
+    if (!fs.exists(path)) {
+      return output;
+    }
+    PathFilter pathFilter = new HiddenFilter();
+    for (FileStatus status : FileListUtils.listFilesRecursively(fs, path, 
pathFilter)) {
+      SeekableInput sin = new FsInput(status.getPath(), fs);
+      DataFileReader<GenericRecord> dfr = new DataFileReader<>(sin, new 
GenericDatumReader<>());
+
+      String key = PathUtils.relativizePath(status.getPath(), path).toString();
+
+      output.put(key, new RecordIterator(dfr.getSchema(), new 
AbstractIterator<GenericRecord>() {
+        @Override
+        protected GenericRecord computeNext() {
+          if (dfr.hasNext()) {
+            return dfr.next();
+          } else {
+            try {
+              dfr.close();
+            } catch (IOException ioe) {
+              log.error("Failed to close data file reader.", ioe);
+            }
+            endOfData();
+            return null;
+          }
+        }
+      }));
+    }
+    return output;
+  }
+
+  /**
+   * Read all avro records in a json base resource in classpath into a map 
from file name to {@link RecordIterator}.
+   */
+  @Override
+  public TreeMap<String, RecordIterator> readAllRecordsInJsonResource(String 
baseResource, @Nullable Schema schema)
+      throws IOException {
+    if (schema == null) {
+      String schemaResource = new File(baseResource, "schema.avsc").toString();
+      schema = readAvscSchema(schemaResource, AvroTestTools.class);
+    }
+
+    TreeMap<String, RecordIterator> output = new TreeMap<>();
+    for (String file : getJsonFileSetByResourceRootName(baseResource)) {
+      log.info("Reading json record from " + file);
+      String name = PathUtils.relativizePath(new Path(file), new 
Path(baseResource)).toString();
+
+      String schemaResourceName = new File(new File(file).getParent(), 
"schema.avsc").toString();
+      Schema thisSchema = readAvscSchema(schemaResourceName, 
AvroTestTools.class);
+      Schema actualSchema = thisSchema == null ? schema : thisSchema;
+
+      try (InputStream is = 
AvroTestTools.class.getClassLoader().getResourceAsStream(file)) {
+        output.put(name,
+            readRecordsFromJsonInputStream(actualSchema, is, 
DecoderFactory.get().jsonDecoder(actualSchema, is)));
+      }
+    }
+    return output;
+  }
+
+  private static RecordIterator readRecordsFromJsonInputStream(Schema schema, 
InputStream is, Decoder decoder) {
+    GenericDatumReader<GenericRecord> reader = new 
GenericDatumReader<>(schema);
+
+    return new RecordIterator(schema, new AbstractIterator<GenericRecord>() {
+      @Override
+      protected GenericRecord computeNext() {
+        try {
+          return reader.read(null, decoder);
+        } catch (IOException ioe) {
+          try {
+            is.close();
+          } catch (IOException exc) {
+            log.warn("Failed to close input stream.", exc);
+          }
+          endOfData();
+          return null;
+        }
+      }
+    });
+  }
+
+  /**
+   * Materialize records in a classpath package into HDFS avro records.
+   * @param baseResource name of the package. The package should contain the 
following:
+   *                     - Exactly one resource called <name>.avsc containing 
the schema of the records
+   *                       (or an explicit schema passed as an argument).
+   *                     - One or more data files called *.json containing the 
records.
+   * @param fs the {@link FileSystem} where the records will be written.
+   * @param targetPath the path where the records will be written.
+   * @param schema Schema of the records, or null to read automatically from a 
resource.
+   * @throws IOException
+   */
+
+  public Schema writeJsonResourceRecordsAsBinary(String baseResource, 
FileSystem fs, Path targetPath,
+      @Nullable Schema schema) throws IOException {
+    TreeMap<String, RecordIterator> recordMap = 
readAllRecordsInJsonResource(baseResource, schema);
+
+    Schema outputSchema = recordMap.lastEntry().getValue().getSchema();
+
+    for (Map.Entry<String, RecordIterator> entry : recordMap.entrySet()) {
+      writeAsAvroBinary(entry.getValue(), entry.getValue().getSchema(), fs, 
new Path(targetPath,
+          removeExtension(entry.getKey()) + ".avro"));
+    }
+
+    return outputSchema;
+  }
+
+  /**
+   * Read schema from an avsc resource file.
+   */
+  public static Schema readAvscSchema(String resource, Class loadedClass) 
throws IOException {
+    try (InputStream is = 
loadedClass.getClassLoader().getResourceAsStream(resource)) {
+      return is != null ? new Schema.Parser().parse(is) : null;
+    }
+  }
+
+  private void writeAsAvroBinary(Iterator<GenericRecord> input, Schema schema, 
FileSystem fs,
+      Path outputPath) throws IOException {
+
+    DataFileWriter writer = new DataFileWriter(new GenericDatumWriter());
+
+    writer.create(schema, fs.create(outputPath, true));
+    while (input.hasNext()) {
+      writer.append(input.next());
+    }
+    writer.close();
+
+    log.info("Successfully wrote avro file to path " + outputPath);
+  }
+
+  /**
+   * An iterator over {@link GenericRecord} which is also aware of schema.
+   */
+  @AllArgsConstructor
+  public static class RecordIterator implements Iterator<GenericRecord> {
+
+    @Getter
+    private final Schema schema;
+    @Delegate
+    private final Iterator<GenericRecord> it;
+  }
+
+  /**
+   * A wrapper of {@link GenericRecord} when schema of record is not important 
in comparison.
+   */
+  @AllArgsConstructor
+  public static class GenericRecordWrapper {
+    public GenericRecord record;
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      GenericRecordWrapper that = (GenericRecordWrapper) o;
+      return compareGenericRecordRegardlessOfSchema(record, that.record);
+    }
+
+    @Override
+    public int hashCode() {
+      // Getting value object array
+      int indexLen = record.getSchema().getFields().size();
+      Object[] objArr = new Object[indexLen];
+      for (int i = 0; i < indexLen; i++) {
+        objArr[i] = record.get(i);
+      }
+      return Objects.hash(objArr);
+    }
+
+    /**
+     * Compare two {@link GenericRecord} instance without considering their 
schema.
+     * Useful when we want to compare two records by discarding some of fields 
like header.
+     */
+    static boolean compareGenericRecordRegardlessOfSchema(GenericRecord r1, 
GenericRecord r2) {
+      List<Schema.Field> listOfFields1 = r1.getSchema().getFields();
+      List<Schema.Field> listOfFields2 = r2.getSchema().getFields();
+
+      if (listOfFields1.size() != listOfFields2.size()) {
+        return false;
+      }
+
+      boolean result = true;
+      for (int i = 0; i < listOfFields1.size(); i++) {
+        result = result && (
+            ((r1.get(i) == null && r2.get(i) == null)
+                || 
(listOfFields1.get(i).name().equals(listOfFields2.get(i).name())
+                && (r1.get(i).equals(r2.get(i)))))
+        );
+      }
+
+      return result;
+    }
+  }
+
+  // Package-private methods shared by different format's tool-kit.
+  static String removeExtension(String string) {
+    if (string.endsWith(".avro") || string.endsWith(".json")) {
+      return string.substring(0, string.length() - 5);
+    }
+    throw new IllegalArgumentException("Only support avro and json 
extensions.");
+  }
+
+  static Set<String> getJsonFileSetByResourceRootName(String baseResource) {
+    Reflections reflections = new Reflections(new ConfigurationBuilder()
+        .forPackages(baseResource)
+        .filterInputsBy(name -> name.startsWith(baseResource))
+        .setScanners(new ResourcesScanner()));
+
+    return reflections.getResources(url -> url.endsWith(".json"));
+  }
+
+  public static boolean isResourceExisted(String resource) throws IOException {
+    return AvroTestTools.class.getClassLoader().getResource(resource) != null;
+  }
+}
diff --git 
a/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/DataTestTools.java
 
b/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/DataTestTools.java
new file mode 100644
index 0000000..45c2057
--- /dev/null
+++ 
b/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/DataTestTools.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.binary_creation;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.TreeMap;
+import java.util.function.BiFunction;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+/**
+ * A ToolKit that will be used for:
+ * - Creating binary-format file(Avro, ORC) using records declared in 
txt(.json) file, and schema defined in .avsc file.
+ * - Deserializing binary-format file into traversable in-memory objects.
+ * - Verifying if contents in two binary-format file are identical with 
certain constraints.
+ *
+ *
+ * @param <T> Iterator containing specific type of a record row,
+ *           e.g. {@link org.apache.avro.generic.GenericRecord} for Avro.
+ * @param <S> Schema type of a specific data format.
+ */
+@Slf4j
+public abstract class DataTestTools<T, S> {
+  /**
+   * Verify that the two inputs contain the same records in the same file 
names. Any fields listed in
+   * blacklistRecordFields will not be used for comparison.
+   * Note that this method is destructive to the input records.
+   * @param expected Expected records map, keyed by file name.
+   * @param observed Observed records map, keyed by file name
+   * @param allowDifferentOrder True if allowing fields arranged in different 
order in comparison of two records.
+   * @param blacklistRecordFields Configurable set of fields that won't be 
included for comparison of two records.
+   * @param allowDifferentSchema True if schema info (for avro, schema can 
contain attributes which is not necessary
+   *                             to be included for comparison)
+   * @return
+   */
+  public abstract boolean checkSameFilesAndRecords(TreeMap<String, T> 
expected, TreeMap<String, T> observed,
+      boolean allowDifferentOrder, Collection<String> blacklistRecordFields, 
boolean allowDifferentSchema);
+
+  /**
+   * Write a resource file under a certain path as specified binary format 
file, like Avro, ORC.
+   * @param baseResource Resource folder that contain JSON files.
+   * @param fs
+   * @param targetPath Output Path.
+   * @param schema The schema of outputed binary file
+   * @return
+   * @throws IOException
+   */
+  public abstract S writeJsonResourceRecordsAsBinary(String baseResource, 
FileSystem fs, Path targetPath, S schema)
+      throws IOException;
+
+  /**
+   * Read all records in a json base resource in classpath into a map from 
file name to iterator of T object.
+   * @param baseResource Base path of the resource directory that contains 
json file.
+   * @param schema The schema of records.
+   * @return A map between file name to an iterator of objects contained in 
path.
+   */
+  public abstract TreeMap<String, T> readAllRecordsInJsonResource(String 
baseResource, S schema) throws IOException;
+
+  /**
+   * Read binary-format records into a map from file name to an iterator of T 
object.
+   * @param fs File system object.
+   * @param path File path
+   * @return A map between file name to an iterator of objects contained in 
path.
+   * @throws IOException
+   */
+  public abstract TreeMap<String, T> 
readAllRecordsInBinaryDirectory(FileSystem fs, Path path) throws IOException;
+
+  /**
+   * Compare two iterators in T type.
+   */
+  <T> boolean compareIterators(Iterator<T> expected, Iterator<T> observed, 
BiFunction<T, T, Boolean> comparator) {
+    while (expected.hasNext()) {
+      if (!observed.hasNext()) {
+        log.error("Expected has more elements than observed.");
+        return false;
+      }
+
+      T t1 = expected.next();
+      T t2 = observed.next();
+
+      boolean equals = comparator == null ? t1.equals(t2) : 
comparator.apply(t1, t2);
+
+      if (!equals) {
+        log.error(String.format("Mismatch: %s does not equal %s.", t1, t2));
+        return false;
+      }
+    }
+
+    if (observed.hasNext()) {
+      log.error("Observed has more elements than expected.");
+      return false;
+    }
+
+    return true;
+  }
+}
diff --git 
a/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/OrcTestTools.java
 
b/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/OrcTestTools.java
new file mode 100644
index 0000000..07c46ab
--- /dev/null
+++ 
b/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/OrcTestTools.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.binary_creation;
+
+import com.google.common.collect.AbstractIterator;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TreeMap;
+import javax.annotation.Nullable;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.experimental.Delegate;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.gobblin.util.FileListUtils;
+import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.filters.HiddenFilter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
+import org.apache.hadoop.hive.ql.io.orc.Reader;
+import org.apache.hadoop.hive.ql.io.orc.RecordReader;
+import org.apache.hadoop.hive.ql.io.orc.Writer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
+import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
+import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+import static org.apache.gobblin.binary_creation.AvroTestTools.*;
+
+
+// A class that examines ORC-Format file in Purger Integration test.
+@Slf4j
+public class OrcTestTools extends DataTestTools<OrcTestTools.OrcRowIterator, 
TypeInfo> {
+
+  /**
+   *
+   * @param expected
+   * @param observed
+   * @param allowDifferentOrder ORC tools will not use this parameter 
currently.
+   * @param blacklistRecordFields ORC tools will not use this parameter 
currently.
+   * @return If two sets of files are identical.
+   * Note that there might be an ordering issue in this comparison method. 
When one is drafting an ORC integration
+   * test, try to name all json files differently.
+   */
+  @Override
+  public boolean checkSameFilesAndRecords(TreeMap<String, OrcRowIterator> 
expected,
+      TreeMap<String, OrcRowIterator> observed, boolean allowDifferentOrder, 
Collection<String> blacklistRecordFields,
+      boolean allowDifferentSchema) {
+    Iterator<String> keys1 = expected.navigableKeySet().iterator();
+    Iterator<String> keys2 = observed.navigableKeySet().iterator();
+
+    return compareIterators(keys1, keys2, (key1, key2) -> {
+      // ORC file doesn't have extension by Linkedin's convention.
+      if (!removeExtension(key1).equals(key2)) {
+        log.error(String.format("Mismatched files: %s and %s", key1, key2));
+        return false;
+      }
+
+      OrcRowIterator it1 = expected.get(key1);
+      OrcRowIterator it2 = observed.get(key2);
+
+      if (!it1.getTypeInfo().equals(it2.getTypeInfo())) {
+        log.error(String.format("Mismatched Typeinfo: %s and %s", key1, key2));
+        return false;
+      }
+
+      boolean result = true;
+      while (it1.hasNext()) {
+        if (!it2.hasNext() || !result) {
+          return false;
+        }
+        result = compareJavaRowAndOrcStruct(((AvroRow) it1.next()).getRow(), 
(OrcStruct) it2.next());
+      }
+      return result;
+    });
+  }
+
+  /**
+   * Given the fact that we couldn't access OrcStruct easily, here uses the 
hacky way(reflection)
+   * to go around access modifier for integration test purpose only.
+   * @param realRow A row containing a list of Java objects.
+   * @param struct An {@link OrcStruct} which essentially is a list of {@link 
Writable} objects.
+   */
+  private boolean compareJavaRowAndOrcStruct(Object realRow, OrcStruct struct) 
{
+    boolean isIdentical = true;
+    ArrayList<Object> javaObjRow = (ArrayList) realRow;
+
+    try {
+      Field objectArr = OrcStruct.class.getDeclaredField("fields");
+      objectArr.setAccessible(true);
+      Object[] dataArr = (Object[]) objectArr.get(struct);
+
+      int index = 0;
+      for (Object dataField : dataArr) {
+        if (dataField instanceof OrcStruct) {
+          isIdentical = isIdentical && 
compareJavaRowAndOrcStruct(javaObjRow.get(index), (OrcStruct) dataField);
+        } else {
+          isIdentical = isIdentical && objCastHelper(javaObjRow.get(index), 
(Writable) dataField);
+        }
+        index++;
+      }
+    } catch (NoSuchFieldException | IllegalAccessException nfe) {
+      throw new RuntimeException("Failed in compare a java object row and 
orcstruct");
+    }
+
+    return isIdentical;
+  }
+
+  /**
+   * All Writable objects passed in here are guaranteed to be primitive 
writable objects.
+   */
+  private boolean objCastHelper(Object javaObj, Writable obj) {
+    if (obj instanceof IntWritable) {
+      return ((IntWritable) obj).get() == (Integer) javaObj;
+    } else if (obj instanceof Text) {
+      return (obj).toString().equals(javaObj);
+    } else if (obj instanceof LongWritable) {
+      return ((LongWritable) obj).get() == (Long) javaObj;
+    } else if (obj instanceof ShortWritable) {
+      return ((ShortWritable) obj).get() == (Short) javaObj;
+    } else if (obj instanceof DoubleWritable) {
+      return ((DoubleWritable) obj).get() == (Double) javaObj;
+    } else {
+      throw new RuntimeException("Cannot recognize the writable type, please 
enrich the castHelper function");
+    }
+  }
+
+  /**
+   * Materialize records in a classpath package into HDFS ORC records.
+   * @param baseResource name of the package. The package should contain the 
following:
+   *                     - Exactly one resource called orcSchema containing 
the schema of the records
+   *                       (or an explicit schema passed as an argument).
+   *                     - One or more data files called *.json containing the 
records.
+   *                     Note that .avsc will not be used in Orc related 
operation.
+   *
+   * @param fs
+   * @param targetPath the path where the records will be written.
+   * @param schema
+   * @return
+   */
+  @Override
+  public TypeInfo writeJsonResourceRecordsAsBinary(String baseResource, 
@Nullable FileSystem fs, Path targetPath,
+      @Nullable TypeInfo schema) throws IOException {
+    TreeMap<String, OrcRowIterator> recordMap = 
readAllRecordsInJsonResource(baseResource, null);
+
+    TypeInfo outputSchema = recordMap.lastEntry().getValue().getTypeInfo();
+    for (Map.Entry<String, OrcRowIterator> entry : recordMap.entrySet()) {
+      writeAsOrcBinary(entry.getValue(), outputSchema, new Path(targetPath, 
removeExtension(entry.getKey())));
+    }
+
+    return outputSchema;
+  }
+
+  /**
+   * AvroRow version of writeAsOrcBinary
+   */
+  private void writeAsOrcBinary(OrcRowIterator input, TypeInfo schema, Path 
outputPath) throws IOException {
+    Configuration configuration = new Configuration();
+
+    // Note that it doesn't support schema evolution at all.
+    // If the schema in realRow is inconsistent with given schema, writing 
into disk
+    // would run into failure.
+    ObjectInspector oi = 
TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(schema);
+    OrcFile.WriterOptions options = 
OrcFile.writerOptions(configuration).inspector(oi);
+    Writer writer = null;
+
+    while (input.hasNext()) {
+      AvroRow avroRow = (AvroRow) input.next();
+      if (writer == null) {
+        options.inspector(avroRow.getInspector());
+        writer = OrcFile.createWriter(outputPath, options);
+      }
+      writer.addRow(avroRow.realRow);
+    }
+    if (writer != null) {
+      writer.close();
+    }
+  }
+
+  // ORC-File Reading related functions
+
+  // There's no GenericRecord for ORC existed(so that OrcStruct even doesn't 
provide readFields as
+  // it is responsible to transform a Writable into GenericRecord in Avro 
world.
+  @Override
+  public TreeMap<String, OrcRowIterator> readAllRecordsInJsonResource(String 
baseResource,
+      @Nullable TypeInfo schema) throws IOException {
+    TypeInfo orcSchema;
+    try {
+      File schemaFile = new File(baseResource, "schema.avsc");
+      String schemaResource = schemaFile.toString();
+      orcSchema = convertAvroSchemaToOrcSchema(readAvscSchema(schemaResource, 
OrcTestTools.class));
+    } catch (SerDeException se) {
+      throw new RuntimeException("Provided Avro Schema cannot be transformed 
to ORC schema", se);
+    }
+
+    TreeMap<String, OrcRowIterator> output = new TreeMap<>();
+    for (String file : getJsonFileSetByResourceRootName(baseResource)) {
+      log.info("Reading json record from " + file);
+      String name = PathUtils.relativizePath(new Path(file), new 
Path(baseResource)).toString();
+      output.put(name, readRecordsFromJsonInputStream(orcSchema, file));
+    }
+
+    return output;
+  }
+
+  public static class AvroRow implements Writable {
+    Object realRow;
+    ObjectInspector inspector;
+
+    public AvroRow(Object row, ObjectInspector inspector) {
+      this.realRow = row;
+      this.inspector = inspector;
+    }
+
+    @Override
+    public void write(DataOutput dataOutput) throws IOException {
+      throw new UnsupportedOperationException("can't write the bundle");
+    }
+
+    @Override
+    public void readFields(DataInput dataInput) throws IOException {
+      throw new UnsupportedOperationException("can't read the bundle");
+    }
+
+    ObjectInspector getInspector() {
+      return inspector;
+    }
+
+    Object getRow() {
+      return realRow;
+    }
+  }
+
+  /**
+   * Deserialize json object into a list of java object as a row, and 
transform each of java object
+   * into {@link Writable} counterpart for constructing {@link OrcStruct}, in 
convenience of Orc reading and writing.
+   *
+   * @param typeInfo The ORC schema in {@link TypeInfo} format.
+   * @param file The file name in String format.
+   * @return
+   */
+  private OrcRowIterator readRecordsFromJsonInputStream(TypeInfo typeInfo, 
String file) throws IOException {
+
+    InputStream is = 
OrcTestTools.class.getClassLoader().getResourceAsStream(file);
+
+
+    // This getParent.getParent is dirty due to we need to simulate 
multiple-partitions scenarios in iTest.
+    String schemaResourceName = new File(new 
File(file).getParentFile().getParent(), "schema.avsc").toString();
+
+    Schema attemptedSchema = readAvscSchema(schemaResourceName, 
OrcTestTools.class);
+    final Schema avroSchema =
+        attemptedSchema == null ? readAvscSchema(new File(new 
File(file).getParent(), "schema.avsc").toString(),
+            OrcTestTools.class) : attemptedSchema;
+
+    GenericDatumReader<GenericRecord> reader = new 
GenericDatumReader<>(avroSchema);
+    Decoder decoder = DecoderFactory.get().jsonDecoder(avroSchema, is);
+
+    return new OrcRowIterator(typeInfo, new AbstractIterator<Writable>() {
+      @Override
+      protected Writable computeNext() {
+        try {
+          GenericRecord record = reader.read(null, decoder);
+          return getAvroWritable(record, avroSchema);
+        } catch (IOException e) {
+          try {
+            is.close();
+          } catch (IOException ioec) {
+            log.warn("Failed to read record from inputstream, will close it 
immediately", ioec);
+          }
+          endOfData();
+          return null;
+        }
+      }
+    });
+  }
+
+  /**
+   * From each record, transformed to {@link AvroRow} object for writing.
+   * One can also choose to use OrcSerDe to obtain ORC-associated writable 
object.
+   *
+   * Using return object of this method would enable a self-maintained ORC 
writer(not from OrcOutputFormat)
+   * to write object.
+   */
+  private Writable getAvroWritable(GenericRecord record, Schema avroSchema) {
+    try {
+      // Construct AvroSerDe with proper schema and deserialize into Hive 
object.
+      AvroSerDe serDe = new AvroSerDe();
+      Properties propertiesWithSchema = new Properties();
+      
propertiesWithSchema.setProperty(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(),
+          avroSchema.toString());
+      serDe.initialize(null, propertiesWithSchema);
+      AvroGenericRecordWritable avroGenericRecordWritable = new 
AvroGenericRecordWritable(record);
+      avroGenericRecordWritable.setFileSchema(avroSchema);
+      Object avroDeserialized = serDe.deserialize(avroGenericRecordWritable);
+      ObjectInspector avroOI = new 
AvroObjectInspectorGenerator(avroSchema).getObjectInspector();
+
+      return new AvroRow(avroDeserialized, avroOI);
+    } catch (SerDeException se) {
+      throw new RuntimeException("Failed in SerDe exception:", se);
+    }
+  }
+
+  /**
+   * Reading ORC file into in-memory representation.
+   */
+  @Override
+  public TreeMap<String, OrcRowIterator> 
readAllRecordsInBinaryDirectory(FileSystem fs, Path path) throws IOException {
+    TreeMap<String, OrcRowIterator> output = new TreeMap<>();
+    if (!fs.exists(path)) {
+      return output;
+    }
+    PathFilter pathFilter = new HiddenFilter();
+    for (FileStatus status : FileListUtils.listFilesRecursively(fs, path, 
pathFilter)) {
+      String key = PathUtils.relativizePath(status.getPath(), path).toString();
+      Reader orcReader = OrcFile.createReader(fs, status.getPath());
+      RecordReader recordReader = orcReader.rows();
+
+      output.put(key, new 
OrcRowIterator(TypeInfoUtils.getTypeInfoFromObjectInspector(orcReader.getObjectInspector()),
+          new AbstractIterator<Writable>() {
+            @Override
+            protected Writable computeNext() {
+              try {
+                if (recordReader.hasNext()) {
+                  return (Writable) recordReader.next(null);
+                } else {
+                  recordReader.close();
+                  endOfData();
+                  return null;
+                }
+              } catch (IOException ioe) {
+                log.warn("Failed to process orc record reader, will terminate 
reader immediately", ioe);
+                endOfData();
+                return null;
+              }
+            }
+          }));
+    }
+
+    return output;
+  }
+
+  /**
+   * An iterator over {@link GenericRecord} which is also aware of schema.
+   */
+  @AllArgsConstructor
+  public static class OrcRowIterator implements Iterator<Writable> {
+
+    @Getter
+    private final TypeInfo typeInfo;
+    @Delegate
+    private final Iterator<Writable> it;
+  }
+
+  /**
+   * Convert Avro schema into TypeInfo.
+   * Current version of Hive used by Gobblin open-source(1.0.1) doesn't have 
{@link org.apache.orc.TypeDescription}
+   * and utilities associated with it. So instead {@link TypeInfo} is being 
used to represent Orc schema.
+   * Note that {@link TypeInfo} is not case preserving as it is actually the 
internal schema representation of Hive.
+   */
+  public static TypeInfo convertAvroSchemaToOrcSchema(Schema avroSchema) 
throws SerDeException {
+        return TypeInfoUtils.getTypeInfoFromObjectInspector(
+            new AvroObjectInspectorGenerator(avroSchema).getObjectInspector());
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-binary-management/src/test/java/org/apache/gobblin/binary_creation/AvroTestToolsTest.java
 
b/gobblin-binary-management/src/test/java/org/apache/gobblin/binary_creation/AvroTestToolsTest.java
new file mode 100644
index 0000000..22004c0
--- /dev/null
+++ 
b/gobblin-binary-management/src/test/java/org/apache/gobblin/binary_creation/AvroTestToolsTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.binary_creation;
+
+import com.google.common.io.Files;
+import java.io.File;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import org.apache.gobblin.binary_creation.AvroTestTools.*;
+
+
+public class AvroTestToolsTest {
+
+  @Test
+  public void test() throws Exception {
+    DataTestTools testTools = new AvroTestTools();
+
+    String resourceName = "avroWriterTest";
+
+    File tmpDir = Files.createTempDir();
+
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    Path output = new Path(tmpDir.getAbsolutePath(), "test");
+
+    testTools.writeJsonResourceRecordsAsBinary(resourceName, fs, output, null);
+
+    
Assert.assertTrue(testTools.checkSameFilesAndRecords(testTools.readAllRecordsInJsonResource(resourceName,
 null),
+        testTools.readAllRecordsInBinaryDirectory(fs, output), false, null, 
true));
+  }
+
+  @Test
+  public void testGenericRecordDataComparisonWithoutSchema() throws Exception {
+    Schema avroSchema = (new Schema.Parser()).parse(
+        "{\n" + "  \"namespace\": \"com.linkedin.compliance.test\",\n" + "  
\"type\": \"record\",\n"
+            + "  \"name\": \"SimpleTest\",\n" + "  \"fields\": [\n" + "    
{\n" + "      \"name\": \"memberId\",\n"
+            + "      \"type\": \"int\"\n" + "    },\n" + "    {\n" + "      
\"name\": \"name\",\n"
+            + "      \"type\": \"string\"\n" + "    }\n" + "  ]\n" + "}");
+
+    Schema avroSchemaDiffInNamespace = (new Schema.Parser()).parse(
+        "{\n" + "  \"namespace\": \"com.linkedin.whatever\",\n" + "  \"type\": 
\"record\",\n"
+            + "  \"name\": \"SimpleTest\",\n" + "  \"fields\": [\n" + "    
{\n" + "      \"name\": \"memberId\",\n"
+            + "      \"type\": \"int\"\n" + "    },\n" + "    {\n" + "      
\"name\": \"name\",\n"
+            + "      \"type\": \"string\"\n" + "    }\n" + "  ]\n" + "}");
+
+    Schema nullableSchema = (new Schema.Parser()).parse(
+        "{\n" + "  \"namespace\": \"com.linkedin.compliance.test\",\n" + "  
\"type\": \"record\",\n"
+            + "  \"name\": \"SimpleTest\",\n" + "  \"fields\": [\n" + "    
{\n" + "      \"name\": \"memberId\",\n"
+            + "      \"type\": [\n" + "        \"null\",\n" + "        
\"int\",\n" + "        \"string\"\n"
+            + "      ]\n" + "    },\n" + "    {\n" + "      \"name\": 
\"name\",\n" + "      \"type\": \"string\"\n"
+            + "    }\n" + "  ]\n" + "}");
+
+    GenericRecordBuilder builder_0 = new GenericRecordBuilder(avroSchema);
+    builder_0.set("memberId", "1");
+    builder_0.set("name", "alice");
+    GenericData.Record record_0 = builder_0.build();
+
+    GenericRecordBuilder builder_1 = new 
GenericRecordBuilder(avroSchemaDiffInNamespace);
+    builder_1.set("memberId", "1");
+    builder_1.set("name", "alice");
+    GenericData.Record record_1 = builder_1.build();
+
+    GenericRecordBuilder builder_2 = new 
GenericRecordBuilder(avroSchemaDiffInNamespace);
+    builder_2.set("memberId", "1");
+    builder_2.set("name", "alice");
+    GenericData.Record record_2 = builder_2.build();
+
+    GenericRecordBuilder builder_3 = new 
GenericRecordBuilder(avroSchemaDiffInNamespace);
+    builder_3.set("memberId", "2");
+    builder_3.set("name", "bob");
+    GenericData.Record record_3 = builder_3.build();
+
+    GenericRecordBuilder builder_4 = new GenericRecordBuilder(nullableSchema);
+    builder_4.set("memberId", null);
+    builder_4.set("name", "bob");
+    GenericData.Record record_4 = builder_4.build();
+
+    GenericRecordBuilder builder_5 = new GenericRecordBuilder(nullableSchema);
+    builder_5.set("memberId", null);
+    builder_5.set("name", "bob");
+    GenericData.Record record_5 = builder_5.build();
+
+    Assert.assertTrue(!record_0.equals(record_1));
+
+    AvroTestTools.GenericRecordWrapper wrapper_0 = new 
GenericRecordWrapper(record_0);
+    GenericRecordWrapper wrapper_1 = new GenericRecordWrapper(record_1);
+    GenericRecordWrapper wrapper_2 = new GenericRecordWrapper(record_2);
+    GenericRecordWrapper wrapper_3 = new GenericRecordWrapper(record_3);
+    GenericRecordWrapper wrapper_4 = new GenericRecordWrapper(record_4);
+    GenericRecordWrapper wrapper_5 = new GenericRecordWrapper(record_5);
+
+    Assert.assertEquals(wrapper_0, wrapper_1);
+    Assert.assertEquals(wrapper_1, wrapper_2);
+    Assert.assertNotSame(wrapper_2, wrapper_3);
+    Assert.assertEquals(wrapper_4, wrapper_5);
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-binary-management/src/test/java/org/apache/gobblin/binary_creation/OrcTestToolsTest.java
 
b/gobblin-binary-management/src/test/java/org/apache/gobblin/binary_creation/OrcTestToolsTest.java
new file mode 100644
index 0000000..49a0e79
--- /dev/null
+++ 
b/gobblin-binary-management/src/test/java/org/apache/gobblin/binary_creation/OrcTestToolsTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.binary_creation;
+
+import com.google.common.io.Files;
+import java.io.File;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.orc.TypeDescription;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class OrcTestToolsTest {
+
+  public DataTestTools orcTools = new OrcTestTools();;
+
+  @Test
+  public void test() throws Exception {
+
+    String resourceName = "orcWriterTest";
+
+    File tmpDir = Files.createTempDir();
+
+    FileSystem fs = FileSystem.get(new Configuration());
+
+    Path output = new Path(tmpDir.getAbsolutePath(), "test");
+
+    orcTools.writeJsonResourceRecordsAsBinary(resourceName, null, output, 
null);
+
+    
Assert.assertTrue(orcTools.checkSameFilesAndRecords(orcTools.readAllRecordsInJsonResource(resourceName,
 null),
+        orcTools.readAllRecordsInBinaryDirectory(fs, output), true, null, 
false));
+  }
+
+  @Test
+  public void testSchemaToTypeInfoConversion() throws Exception {
+    // Simple non-nested case:
+    Schema avroSchema = SchemaBuilder.record("test")
+        .fields()
+        .name("id")
+        .type()
+        .intType()
+        .noDefault()
+        .name("timestamp")
+        .type()
+        .stringType()
+        .noDefault()
+        .endRecord();
+
+    TypeInfo orcSchema = OrcTestTools.convertAvroSchemaToOrcSchema(avroSchema);
+    String targetOrcSchemaString = "struct<id:int,timestamp:string>";
+    Assert.assertEquals(targetOrcSchemaString, orcSchema.toString());
+
+    // Nested case:
+    avroSchema = SchemaBuilder.record("nested")
+        .fields()
+        .name("nestedId")
+        .type()
+        .array()
+        .items()
+        .stringType()
+        .noDefault()
+        .name("timestamp")
+        .type()
+        .stringType()
+        .noDefault()
+        .endRecord();
+    orcSchema = OrcTestTools.convertAvroSchemaToOrcSchema(avroSchema);
+    TypeDescription targetTypeDescription = TypeDescription.createStruct()
+        .addField("nestedId", 
TypeDescription.createList(TypeDescription.createString()))
+        .addField("timestamp", TypeDescription.createString());
+    Assert.assertEquals(orcSchema.toString().toLowerCase(), 
targetTypeDescription.toString().toLowerCase());
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-binary-management/src/test/resources/avroWriterTest/data1.json 
b/gobblin-binary-management/src/test/resources/avroWriterTest/data1.json
new file mode 100644
index 0000000..59be71c
--- /dev/null
+++ b/gobblin-binary-management/src/test/resources/avroWriterTest/data1.json
@@ -0,0 +1,2 @@
+{"name": "Alyssa", "favorite_number": {"int": 256}, "favorite_color": null}
+{"name": "Ben", "favorite_number": {"int": 7}, "favorite_color": {"string": 
"red"}}
diff --git 
a/gobblin-binary-management/src/test/resources/avroWriterTest/data2.json 
b/gobblin-binary-management/src/test/resources/avroWriterTest/data2.json
new file mode 100644
index 0000000..ead75c4
--- /dev/null
+++ b/gobblin-binary-management/src/test/resources/avroWriterTest/data2.json
@@ -0,0 +1 @@
+{"name": "Charlie", "favorite_number": null, "favorite_color": {"string": 
"blue"}}
diff --git 
a/gobblin-binary-management/src/test/resources/avroWriterTest/schema.avsc 
b/gobblin-binary-management/src/test/resources/avroWriterTest/schema.avsc
new file mode 100644
index 0000000..8d88e5e
--- /dev/null
+++ b/gobblin-binary-management/src/test/resources/avroWriterTest/schema.avsc
@@ -0,0 +1,25 @@
+{
+  "namespace": "example.avro",
+  "type": "record",
+  "name": "User",
+  "fields": [
+    {
+      "name": "name",
+      "type": "string"
+    },
+    {
+      "name": "favorite_number",
+      "type": [
+        "null",
+        "int"
+      ]
+    },
+    {
+      "name": "favorite_color",
+      "type": [
+        "null",
+        "string"
+      ]
+    }
+  ]
+}
diff --git 
a/gobblin-binary-management/src/test/resources/orcWriterTest/data1.json 
b/gobblin-binary-management/src/test/resources/orcWriterTest/data1.json
new file mode 100644
index 0000000..17fdea9
--- /dev/null
+++ b/gobblin-binary-management/src/test/resources/orcWriterTest/data1.json
@@ -0,0 +1 @@
+{"memberId": 1, "name": "Alyssa"}
\ No newline at end of file
diff --git 
a/gobblin-binary-management/src/test/resources/orcWriterTest/data2.json 
b/gobblin-binary-management/src/test/resources/orcWriterTest/data2.json
new file mode 100644
index 0000000..a798165
--- /dev/null
+++ b/gobblin-binary-management/src/test/resources/orcWriterTest/data2.json
@@ -0,0 +1 @@
+{"memberId": 2, "name": "Bob"}
\ No newline at end of file
diff --git 
a/gobblin-binary-management/src/test/resources/orcWriterTest/schema.avsc 
b/gobblin-binary-management/src/test/resources/orcWriterTest/schema.avsc
new file mode 100644
index 0000000..f9fdf51
--- /dev/null
+++ b/gobblin-binary-management/src/test/resources/orcWriterTest/schema.avsc
@@ -0,0 +1,15 @@
+{
+  "namespace": "com.linkedin.compliance.test",
+  "type": "record",
+  "name": "SimpleTest",
+  "fields": [
+    {
+      "name": "memberId",
+      "type": "int"
+    },
+    {
+      "name": "name",
+      "type": "string"
+    }
+  ]
+}
diff --git a/gradle/scripts/dependencyDefinitions.gradle 
b/gradle/scripts/dependencyDefinitions.gradle
index 0009397..f669316 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -167,6 +167,7 @@ ext.externalDependency = [
     "opencsv": "com.opencsv:opencsv:3.8",
     "grok": "io.thekraken:grok:0.1.5",
     "hadoopAdl" : "org.apache.hadoop:hadoop-azure-datalake:3.0.0-alpha2",
+    "orcMapreduce":"org.apache.orc:orc-mapreduce:1.5.4",
     'parquet': 'com.twitter:parquet-hadoop-bundle:1.5.0',
     'reactivex': 'io.reactivex.rxjava2:rxjava:2.1.0',
     "slf4j": [
diff --git a/settings.gradle b/settings.gradle
index 629345c..f1ceb05 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -41,7 +41,8 @@ def modules = ['gobblin-admin',
                'gobblin-cluster',
                'gobblin-aws',
                'gobblin-service',
-               'gobblin-test-utils']
+               'gobblin-test-utils',
+               'gobblin-binary-management']
 
 // Disable jacoco for now as Kafka 0.8 is the default version and jacoco does 
not like the same classes
 // being declared in different modules

Reply via email to