This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4483268  [GOBBLIN-1008] Upgrading parquet dependency to 
org.apache.parquet. Fixing tests
4483268 is described below

commit 4483268e3885adb63da7253df5ab0c00eb86aece
Author: Shirshanka Das <[email protected]>
AuthorDate: Tue Dec 24 21:15:39 2019 -0800

    [GOBBLIN-1008] Upgrading parquet dependency to org.apache.parquet. Fixing 
tests
    
    Closes #2853 from shirshanka/parquet-upgrade
---
 .../finder/HdfsModifiedTimeHiveVersionFinder.java  |   4 +-
 gobblin-docs/sinks/ParquetHdfsDataWriter.md        |  26 ++-
 .../build.gradle                                   |   1 +
 .../parquet/JsonElementConversionFactory.java      |  63 ++---
 .../JsonIntermediateToParquetGroupConverter.java   |  11 +-
 .../gobblin/converter/parquet/ParquetGroup.java    |  56 +++--
 .../gobblin/writer/ParquetDataWriterBuilder.java   |  24 +-
 .../gobblin/writer/ParquetHdfsDataWriter.java      |   6 +-
 ...sonIntermediateToParquetGroupConverterTest.java |  14 +-
 .../gobblin/writer/ParquetHdfsDataWriterTest.java  |  20 +-
 .../org/apache/gobblin/writer/TestConstants.java   |  14 +-
 .../JsonIntermediateToParquetConverter.json        | 259 +++++++++++++++++++++
 .../build.gradle                                   |   1 -
 .../gobblin/converter/parquet/JsonSchema.java      |  14 +-
 gobblin-modules/gobblin-parquet/build.gradle       |   3 +-
 .../parquet/JsonElementConversionFactory.java      |  23 +-
 .../JsonIntermediateToParquetGroupConverter.java   |  12 +-
 .../gobblin/converter/parquet/ParquetGroup.java    |   1 +
 .../gobblin/writer/ParquetDataWriterBuilder.java   |   5 +-
 .../gobblin/writer/ParquetHdfsDataWriter.java      |   6 +-
 ...sonIntermediateToParquetGroupConverterTest.java |  11 +-
 .../gobblin/writer/ParquetHdfsDataWriterTest.java  |   6 +-
 .../org/apache/gobblin/writer/TestConstants.java   |   2 +-
 gobblin-test-harness/build.gradle                  |   1 +
 .../gobblin/GobblinLocalJobLauncherUtils.java      |   4 +-
 .../gobblin/TaskSkipErrRecordsIntegrationTest.java |  12 +-
 .../gobblin/WriterOutputFormatIntegrationTest.java |  19 +-
 .../writer_output_format_test.properties           |   7 +-
 gradle/scripts/dependencyDefinitions.gradle        |   3 +-
 29 files changed, 475 insertions(+), 153 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/HdfsModifiedTimeHiveVersionFinder.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/HdfsModifiedTimeHiveVersionFinder.java
index 8a7cb87..c427962 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/HdfsModifiedTimeHiveVersionFinder.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/HdfsModifiedTimeHiveVersionFinder.java
@@ -18,15 +18,15 @@ package org.apache.gobblin.data.management.version.finder;
 
 import java.io.IOException;
 
-import 
org.apache.gobblin.data.management.version.TimestampedHiveDatasetVersion;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.joda.time.DateTime;
 
+import com.google.common.base.Preconditions;
 import com.typesafe.config.Config;
 
-import parquet.Preconditions;
+import 
org.apache.gobblin.data.management.version.TimestampedHiveDatasetVersion;
 
 
 /**
diff --git a/gobblin-docs/sinks/ParquetHdfsDataWriter.md 
b/gobblin-docs/sinks/ParquetHdfsDataWriter.md
index f3ad0da..a7a24ea 100644
--- a/gobblin-docs/sinks/ParquetHdfsDataWriter.md
+++ b/gobblin-docs/sinks/ParquetHdfsDataWriter.md
@@ -1,6 +1,6 @@
 # Description
 
-An extension to 
[`FsDataWriter`](https://github.com/apache/incubator-gobblin/blob/master/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java)
 that writes in Parquet format in the form of 
[`Group.java`](https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/example/data/Group.java).
 This implementation allows users to specify the CodecFactory to use through 
the configuration property [`writer.codec.type`](https://gobblin.readthe [...]
+An extension to 
[`FsDataWriter`](https://github.com/apache/incubator-gobblin/blob/master/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java)
 that writes in Parquet format in the form of 
[`Group.java`](https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/example/data/Group.java).
 This implementation allows users to specify the CodecFactory to use through 
the configuration property [`writer.codec.type`](https://gobblin.readthe [...]
 
 # Usage
 ```
@@ -8,10 +8,6 @@ 
writer.builder.class=org.apache.gobblin.writer.ParquetDataWriterBuilder
 writer.destination.type=HDFS
 writer.output.format=PARQUET
 ```
-For more info, see 
-[`ParquetHdfsDataWriter`](https://github.com/apache/incubator-gobblin/blob/master/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java)
-and
-[`ParquetDataWriterBuilder`](https://github.com/apache/incubator-gobblin/blob/master/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java)
 
 
 # Configuration
@@ -22,4 +18,22 @@ and
 | writer.parquet.dictionary.page.size | The block size threshold for the 
dictionary pages. | 134217728 | No |
 | writer.parquet.dictionary | To turn dictionary encoding on. Parquet has a 
dictionary encoding for data with a small number of unique values ( < 10^5 ) 
that aids in significant compression and boosts processing speed. | true | No |
 | writer.parquet.validate | To turn on validation using the schema. This 
validation is done by 
[`ParquetWriter`](https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java)
 not by Gobblin. | false | No |
-| writer.parquet.version | Version of parquet writer to use. Available 
versions are v1 and v2. | v1 | No |
\ No newline at end of file
+| writer.parquet.version | Version of parquet writer to use. Available 
versions are v1 and v2. | v1 | No |
+
+# Developer Notes
+
+Gobblin provides integration with two different versions of Parquet through 
its modules. Use the appropriate jar based on the Parquet library you use in 
your code.
+
+| Jar | Dependency | Gobblin Release |
+|-----|-------------|--------|
+| 
[`gobblin-parquet`](https://mvnrepository.com/artifact/org.apache.gobblin/gobblin-parquet)
 | 
[`com.twitter:parquet-hadoop-bundle`](https://mvnrepository.com/artifact/com.twitter/parquet-hadoop-bundle)
 | >= 0.12.0 |
+| 
[`gobblin-parquet-apache`](https://mvnrepository.com/artifact/org.apache.gobblin/gobblin-parquet-apache)
 | 
[`org.apache.parquet:parquet-hadoop`](https://mvnrepository.com/artifact/org.apache.parquet/parquet-hadoop)
 | >= 0.15.0 |
+
+If you want to look at the code, check out:
+
+| Module | File |
+| ------ | ---- |
+| gobblin-parquet | 
[`ParquetHdfsDataWriter`](https://github.com/apache/incubator-gobblin/blob/master/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java)
 |
+| gobblin-parquet | 
[`ParquetDataWriterBuilder`](https://github.com/apache/incubator-gobblin/blob/master/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java)
 |
+| gobblin-parquet-apache | 
[`ParquetHdfsDataWriter`](https://github.com/apache/incubator-gobblin/blob/master/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java)
 |
+| gobblin-parquet-apache | 
[`ParquetDataWriterBuilder`](https://github.com/apache/incubator-gobblin/blob/master/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java)
 |
diff --git a/gobblin-modules/gobblin-parquet/build.gradle 
b/gobblin-modules/gobblin-parquet-apache/build.gradle
similarity index 96%
copy from gobblin-modules/gobblin-parquet/build.gradle
copy to gobblin-modules/gobblin-parquet-apache/build.gradle
index 75530b9..560638d 100644
--- a/gobblin-modules/gobblin-parquet/build.gradle
+++ b/gobblin-modules/gobblin-parquet-apache/build.gradle
@@ -19,6 +19,7 @@ apply plugin: 'java'
 
 dependencies {
   compile project(":gobblin-core")
+  compile project(":gobblin-modules:gobblin-parquet-common")
 
   compile externalDependency.gson
   compile externalDependency.parquet
diff --git 
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java
 
b/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java
similarity index 88%
copy from 
gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java
copy to 
gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java
index 288a5de..822eb65 100644
--- 
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java
+++ 
b/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java
@@ -14,44 +14,45 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gobblin.converter.parquet;
 
+package org.apache.gobblin.converter.parquet;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.gobblin.converter.parquet.JsonSchema.*;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.BinaryValue;
+import org.apache.parquet.example.data.simple.BooleanValue;
+import org.apache.parquet.example.data.simple.DoubleValue;
+import org.apache.parquet.example.data.simple.FloatValue;
+import org.apache.parquet.example.data.simple.IntegerValue;
+import org.apache.parquet.example.data.simple.LongValue;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
 
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 
-import parquet.example.data.Group;
-import parquet.example.data.simple.BinaryValue;
-import parquet.example.data.simple.BooleanValue;
-import parquet.example.data.simple.DoubleValue;
-import parquet.example.data.simple.FloatValue;
-import parquet.example.data.simple.IntegerValue;
-import parquet.example.data.simple.LongValue;
-import parquet.io.api.Binary;
-import parquet.schema.GroupType;
-import parquet.schema.MessageType;
-import parquet.schema.PrimitiveType;
-import parquet.schema.PrimitiveType.PrimitiveTypeName;
-import parquet.schema.Type;
-import parquet.schema.Types;
+import org.apache.gobblin.converter.parquet.JsonSchema.*;
 
-import static 
org.apache.gobblin.converter.parquet.JsonElementConversionFactory.RecordConverter.RecordType.CHILD;
 import static org.apache.gobblin.converter.parquet.JsonSchema.*;
 import static org.apache.gobblin.converter.parquet.JsonSchema.InputType.STRING;
-import static parquet.schema.OriginalType.UTF8;
-import static parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
-import static parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
-import static parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
-import static parquet.schema.Type.Repetition.OPTIONAL;
-import static parquet.schema.Type.Repetition.REPEATED;
+import static 
org.apache.gobblin.converter.parquet.JsonElementConversionFactory.RecordConverter.RecordType.CHILD;
+import static org.apache.parquet.schema.OriginalType.UTF8;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
+import static org.apache.parquet.schema.Type.Repetition.REPEATED;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
 
 
 /**
@@ -177,7 +178,7 @@ public class JsonElementConversionFactory {
     }
 
     protected Type buildSchema() {
-      return new PrimitiveType(this.repeated ? REPEATED : 
this.jsonSchema.optionalOrRequired(), this.outputType,
+      return new PrimitiveType(this.repeated ? REPEATED : 
optionalOrRequired(this.jsonSchema), this.outputType,
           this.jsonSchema.getColumnName());
     }
 
@@ -294,7 +295,7 @@ public class JsonElementConversionFactory {
       if (this.repeated) {
         return Types.repeated(BINARY).as(UTF8).named(columnName);
       }
-      switch (this.jsonSchema.optionalOrRequired()) {
+      switch (optionalOrRequired(this.jsonSchema)) {
         case OPTIONAL:
           return Types.optional(BINARY).as(UTF8).named(columnName);
         case REQUIRED:
@@ -305,6 +306,10 @@ public class JsonElementConversionFactory {
     }
   }
 
+  public static Type.Repetition optionalOrRequired(JsonSchema jsonBaseSchema) {
+    return jsonBaseSchema.isNullable() ? OPTIONAL : REQUIRED;
+  }
+
   public static class ArrayConverter extends CollectionConverter {
 
     public ArrayConverter(JsonSchema arraySchema) {
@@ -325,7 +330,7 @@ public class JsonElementConversionFactory {
     protected Type buildSchema() {
       List<Type> fields = new ArrayList<>();
       fields.add(0, this.elementConverter.schema());
-      return new GroupType(this.jsonSchema.optionalOrRequired(), 
this.jsonSchema.getColumnName(), fields);
+      return new GroupType(optionalOrRequired(jsonSchema), 
this.jsonSchema.getColumnName(), fields);
     }
 
     @Override
@@ -396,7 +401,7 @@ public class JsonElementConversionFactory {
         JsonElementConverter converter = this.converters.get(key);
         Object convertedValue = converter.convert(entry.getValue());
         boolean valueIsNull = convertedValue == null;
-        Type.Repetition repetition = converter.jsonSchema.optionalOrRequired();
+        Type.Repetition repetition = optionalOrRequired(converter.jsonSchema);
         if (valueIsNull && repetition.equals(OPTIONAL)) {
           continue;
         }
@@ -422,7 +427,7 @@ public class JsonElementConversionFactory {
         case ROOT:
           return new MessageType(docName, parquetTypes);
         case CHILD:
-          return new GroupType(this.jsonSchema.optionalOrRequired(), docName, 
parquetTypes);
+          return new GroupType(optionalOrRequired(this.jsonSchema), docName, 
parquetTypes);
         default:
           throw new RuntimeException("Unsupported Record type");
       }
@@ -463,7 +468,7 @@ public class JsonElementConversionFactory {
           Types.repeatedGroup().addFields(keyConverter.schema(), 
elementConverter.schema()).named(MAP_KEY)
               .asGroupType();
       String columnName = this.jsonSchema.getColumnName();
-      switch (this.jsonSchema.optionalOrRequired()) {
+      switch (optionalOrRequired(this.jsonSchema)) {
         case OPTIONAL:
           return 
Types.optionalGroup().addFields(mapGroup).named(columnName).asGroupType();
         case REQUIRED:
diff --git 
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverter.java
 
b/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverter.java
similarity index 96%
copy from 
gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverter.java
copy to 
gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverter.java
index b04dcf8..1eda5b7 100644
--- 
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverter.java
+++ 
b/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverter.java
@@ -15,6 +15,11 @@
  * limitations under the License.
  */
 package org.apache.gobblin.converter.parquet;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.schema.MessageType;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
 
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.converter.Converter;
@@ -23,12 +28,6 @@ import 
org.apache.gobblin.converter.SchemaConversionException;
 import org.apache.gobblin.converter.SingleRecordIterable;
 import 
org.apache.gobblin.converter.parquet.JsonElementConversionFactory.RecordConverter;
 
-import com.google.gson.JsonArray;
-import com.google.gson.JsonObject;
-
-import parquet.example.data.Group;
-import parquet.schema.MessageType;
-
 import static 
org.apache.gobblin.converter.parquet.JsonElementConversionFactory.RecordConverter.RecordType.ROOT;
 
 
diff --git 
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/ParquetGroup.java
 
b/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/converter/parquet/ParquetGroup.java
similarity index 82%
copy from 
gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/ParquetGroup.java
copy to 
gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/converter/parquet/ParquetGroup.java
index 783d845..0b7e409 100644
--- 
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/ParquetGroup.java
+++ 
b/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/converter/parquet/ParquetGroup.java
@@ -15,27 +15,26 @@
  * limitations under the License.
  */
 package org.apache.gobblin.converter.parquet;
-
 import java.util.ArrayList;
 import java.util.List;
 
-import parquet.example.data.Group;
-import parquet.example.data.simple.BinaryValue;
-import parquet.example.data.simple.BooleanValue;
-import parquet.example.data.simple.DoubleValue;
-import parquet.example.data.simple.FloatValue;
-import parquet.example.data.simple.Int96Value;
-import parquet.example.data.simple.IntegerValue;
-import parquet.example.data.simple.LongValue;
-import parquet.example.data.simple.NanoTime;
-import parquet.example.data.simple.Primitive;
-import parquet.io.api.Binary;
-import parquet.io.api.RecordConsumer;
-import parquet.schema.GroupType;
-import parquet.schema.PrimitiveType;
-import parquet.schema.Type;
-
-import static parquet.schema.Type.Repetition.REPEATED;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.BinaryValue;
+import org.apache.parquet.example.data.simple.BooleanValue;
+import org.apache.parquet.example.data.simple.DoubleValue;
+import org.apache.parquet.example.data.simple.FloatValue;
+import org.apache.parquet.example.data.simple.Int96Value;
+import org.apache.parquet.example.data.simple.IntegerValue;
+import org.apache.parquet.example.data.simple.LongValue;
+import org.apache.parquet.example.data.simple.NanoTime;
+import org.apache.parquet.example.data.simple.Primitive;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+
+import static org.apache.parquet.schema.Type.Repetition.REPEATED;
 
 
 /**
@@ -83,6 +82,7 @@ public class ParquetGroup extends Group {
     return result.toString();
   }
 
+  @Override
   public Group addGroup(int fieldIndex) {
     ParquetGroup g = new 
ParquetGroup(this.schema.getType(fieldIndex).asGroupType());
     this.data[fieldIndex].add(g);
@@ -139,6 +139,21 @@ public class ParquetGroup extends Group {
     return ((IntegerValue) this.getValue(fieldIndex, index)).getInteger();
   }
 
+  @Override
+  public long getLong(int fieldIndex, int index) {
+    return ((LongValue) this.getValue(fieldIndex, index)).getLong();
+  }
+
+  @Override
+  public double getDouble(int fieldIndex, int index) {
+    return ((DoubleValue) this.getValue(fieldIndex, index)).getDouble();
+  }
+
+  @Override
+  public float getFloat(int fieldIndex, int index) {
+    return ((FloatValue) this.getValue(fieldIndex, index)).getFloat();
+  }
+
   public boolean getBoolean(int fieldIndex, int index) {
     return ((BooleanValue) this.getValue(fieldIndex, index)).getBoolean();
   }
@@ -193,6 +208,11 @@ public class ParquetGroup extends Group {
     this.add(fieldIndex, new DoubleValue(value));
   }
 
+  @Override
+  public void add(int fieldIndex, Group value) {
+    this.data[fieldIndex].add(value);
+  }
+
   public GroupType getType() {
     return this.schema;
   }
diff --git 
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
 
b/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
similarity index 88%
copy from 
gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
copy to 
gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
index 7ce2020..ac4e2d4 100644
--- 
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
+++ 
b/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
@@ -19,29 +19,29 @@ package org.apache.gobblin.writer;
 import java.io.IOException;
 import java.util.Optional;
 
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.util.ForkOperatorUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.MessageType;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 
-import parquet.column.ParquetProperties;
-import parquet.example.data.Group;
-import parquet.hadoop.ParquetWriter;
-import parquet.hadoop.example.GroupWriteSupport;
-import parquet.hadoop.metadata.CompressionCodecName;
-import parquet.schema.MessageType;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.ForkOperatorUtils;
 
 import static org.apache.gobblin.configuration.ConfigurationKeys.LOCAL_FS_URI;
 import static 
org.apache.gobblin.configuration.ConfigurationKeys.WRITER_CODEC_TYPE;
 import static 
org.apache.gobblin.configuration.ConfigurationKeys.WRITER_FILE_SYSTEM_URI;
 import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_PREFIX;
-import static parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
-import static parquet.hadoop.ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED;
-import static parquet.hadoop.ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED;
-import static parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static 
org.apache.parquet.hadoop.ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED;
+import static 
org.apache.parquet.hadoop.ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE;
 
 
 public class ParquetDataWriterBuilder extends FsDataWriterBuilder<MessageType, 
Group> {
diff --git 
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java
 
b/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java
similarity index 95%
copy from 
gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java
copy to 
gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java
index a775bc2..8a2fc9e 100644
--- 
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java
+++ 
b/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java
@@ -19,12 +19,12 @@ package org.apache.gobblin.writer;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetWriter;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 
-import parquet.example.data.Group;
-import parquet.hadoop.ParquetWriter;
-
 
 /**
  * An extension to {@link FsDataWriter} that writes in Parquet format in the 
form of {@link Group}s.
diff --git 
a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverterTest.java
 
b/gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverterTest.java
similarity index 98%
copy from 
gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverterTest.java
copy to 
gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverterTest.java
index da9a7ce..d714680 100644
--- 
a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverterTest.java
+++ 
b/gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverterTest.java
@@ -19,11 +19,8 @@ package org.apache.gobblin.converter.parquet;
 import java.io.InputStreamReader;
 import java.lang.reflect.Type;
 
-import org.apache.gobblin.configuration.SourceState;
-import org.apache.gobblin.configuration.WorkUnitState;
-import org.apache.gobblin.converter.DataConversionException;
-import org.apache.gobblin.converter.SchemaConversionException;
-import org.apache.gobblin.source.workunit.Extract;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.schema.MessageType;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
@@ -31,8 +28,11 @@ import com.google.gson.Gson;
 import com.google.gson.JsonObject;
 import com.google.gson.reflect.TypeToken;
 
-import parquet.example.data.Group;
-import parquet.schema.MessageType;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.converter.DataConversionException;
+import org.apache.gobblin.converter.SchemaConversionException;
+import org.apache.gobblin.source.workunit.Extract;
 
 import static org.testng.Assert.assertEquals;
 
diff --git 
a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
 
b/gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
similarity index 93%
copy from 
gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
copy to 
gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
index 40c638c..46b41d7 100644
--- 
a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
+++ 
b/gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
@@ -22,8 +22,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.State;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
@@ -32,13 +30,16 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import parquet.example.data.Group;
-import parquet.example.data.simple.convert.GroupRecordConverter;
-import parquet.hadoop.ParquetReader;
-import parquet.hadoop.api.InitContext;
-import parquet.hadoop.api.ReadSupport;
-import parquet.io.api.RecordMaterializer;
-import parquet.schema.MessageType;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.MessageType;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
 
 import static 
org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_DICTIONARY;
 import static 
org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_DICTIONARY_PAGE_SIZE;
@@ -91,6 +92,7 @@ public class ParquetHdfsDataWriterTest {
     properties.setProp(WRITER_PARQUET_DICTIONARY_PAGE_SIZE, 1024);
     properties.setProp(WRITER_PARQUET_PAGE_SIZE, 1024);
     properties.setProp(WRITER_PARQUET_VALIDATE, true);
+    properties.setProp(ConfigurationKeys.WRITER_CODEC_TYPE, "gzip");
     return properties;
   }
 
diff --git 
a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java
 
b/gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/writer/TestConstants.java
similarity index 86%
copy from 
gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java
copy to 
gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/writer/TestConstants.java
index 6144aaf..41d7ee3 100644
--- 
a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java
+++ 
b/gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/writer/TestConstants.java
@@ -16,12 +16,12 @@
  */
 package org.apache.gobblin.writer;
 
-import parquet.example.data.Group;
-import parquet.example.data.simple.SimpleGroup;
-import parquet.schema.MessageType;
-import parquet.schema.OriginalType;
-import parquet.schema.PrimitiveType;
-import parquet.schema.Types;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroup;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
 
 
 public class TestConstants {
@@ -37,7 +37,7 @@ public class TestConstants {
 
   public static final String TEST_FS_URI = "file:///";
 
-  public static final String TEST_ROOT_DIR = 
System.getProperty("java.io.tmpdir");
+  public static final String TEST_ROOT_DIR = 
System.getProperty("java.io.tmpdir") + "/" + System.currentTimeMillis();
 
   public static final String TEST_STAGING_DIR = TEST_ROOT_DIR + "/staging";
 
diff --git 
a/gobblin-modules/gobblin-parquet-apache/src/test/resources/converter/JsonIntermediateToParquetConverter.json
 
b/gobblin-modules/gobblin-parquet-apache/src/test/resources/converter/JsonIntermediateToParquetConverter.json
new file mode 100644
index 0000000..bbd7344
--- /dev/null
+++ 
b/gobblin-modules/gobblin-parquet-apache/src/test/resources/converter/JsonIntermediateToParquetConverter.json
@@ -0,0 +1,259 @@
+{
+  "simplePrimitiveTypes": {
+    "record": {
+      "a": 5,
+      "b": 5.0,
+      "c": 8.0,
+      "d": true,
+      "e": "somestring",
+      "f": "2018-01-01",
+      "g": 1545083047
+    },
+    "schema": [
+      {
+        "columnName": "a",
+        "dataType": {
+          "type": "int"
+        }
+      },
+      {
+        "columnName": "b",
+        "dataType": {
+          "type": "float"
+        }
+      },
+      {
+        "columnName": "c",
+        "dataType": {
+          "type": "double"
+        }
+      },
+      {
+        "columnName": "d",
+        "dataType": {
+          "type": "boolean"
+        }
+      },
+      {
+        "columnName": "e",
+        "dataType": {
+          "type": "string"
+        }
+      },
+      {
+        "columnName": "f",
+        "dataType": {
+          "type": "date"
+        }
+      },
+      {
+        "columnName": "g",
+        "dataType": {
+          "type": "timestamp"
+        }
+      }
+    ],
+    "expectedRecord": "a: 5 ; b: 5.0 ; c: 8.0 ; d: true ; e: somestring ; f: 
2018-01-01 ; g: 1545083047 ;",
+    "expectedSchema": "message test_table{ ; required int32 a ;  ; required 
float b ;  ; required double c ;  ; required boolean d ;  ; required binary e 
(UTF8) ;  ; required binary f (UTF8) ;  ; required binary g (UTF8) ;  ; } ; "
+  },
+  "array": {
+    "record": {
+      "somearray": [
+        1,
+        2,
+        3
+      ],
+      "somearray1": [
+        1,
+        2,
+        3
+      ],
+      "somearray2": [
+        1.0,
+        2.0,
+        3.0
+      ],
+      "somearray3": [
+        1.0,
+        2.0,
+        3.0
+      ],
+      "somearray4": [
+        true,
+        false,
+        true
+      ],
+      "somearray5": [
+        "hello",
+        "world"
+      ]
+    },
+    "schema": [
+      {
+        "columnName": "somearray",
+        "dataType": {
+          "type": "array",
+          "items": "int"
+        },
+        "isNullable": true
+      },
+      {
+        "columnName": "somearray1",
+        "dataType": {
+          "type": "array",
+          "items": "long"
+        }
+      },
+      {
+        "columnName": "somearray2",
+        "dataType": {
+          "type": "array",
+          "items": "float"
+        }
+      },
+      {
+        "columnName": "somearray3",
+        "dataType": {
+          "type": "array",
+          "items": "double"
+        }
+      },
+      {
+        "columnName": "somearray4",
+        "dataType": {
+          "type": "array",
+          "items": "boolean"
+        }
+      },
+      {
+        "columnName": "somearray5",
+        "dataType": {
+          "type": "array",
+          "items": "string"
+        }
+      }
+    ],
+    "expectedRecord": "somearray ;  item:1 ;  item:2 ;  item:3 ; somearray1 ;  
item:1 ;  item:2 ;  item:3 ; somearray2 ;  item:1.0 ;  item:2.0 ;  item:3.0 ; 
somearray3 ;  item:1.0 ;  item:2.0 ;  item:3.0 ; somearray4 ;  item:true ;  
item:false ;  item:true ; somearray5 ;  item:hello ;  item:world ; ",
+    "expectedSchema": "message test_table {  ;  optional group somearray {  ; 
repeated int32 item ;  ; } ;  required groupsomearray1 {  ; repeated int64 item 
;  ; } ;  required groupsomearray2 {  ; repeated float item ;  ; } ;  required 
groupsomearray3 {  ; repeated double item ;  ; } ;  required groupsomearray4 {  
; repeated boolean item ;  ; } ;  required groupsomearray5 {  ; repeated binary 
item(UTF8) ;  ; } ; } ; "
+  },
+  "enum": {
+    "record": {
+      "some_enum": "HELLO"
+    },
+    "schema": [
+      {
+        "columnName": "some_enum",
+        "dataType": {
+          "type": "enum",
+          "symbols": [
+            "HELLO",
+            "WORLD"
+          ]
+        },
+        "isNullable": true
+      }
+    ],
+    "expectedRecord": "some_enum : HELLO ;",
+    "expectedSchema": "message test_table { ; optional binary some_enum (UTF8) 
;; } ;"
+  },
+  "enum1": {
+    "record": {
+      "some_enum": "HELLO"
+    },
+    "schema": [
+      {
+        "columnName": "some_enum",
+        "dataType": {
+          "type": "enum",
+          "symbols": [
+            "HELLO",
+            "WORLD"
+          ]
+        },
+        "isNullable": false
+      }
+    ],
+    "expectedRecord": "some_enum : HELLO ;",
+    "expectedSchema": "message test_table { ; required binary some_enum (UTF8) 
;; } ;"
+  },
+  "record": {
+    "record": {
+      "some_record": {
+        "name": "me",
+        "age": 22,
+        "some_array": [
+          3,
+          4,
+          5
+        ]
+      }
+    },
+    "schema": [
+      {
+        "columnName": "some_record",
+        "dataType": {
+          "type": "record",
+          "values": [
+            {
+              "columnName": "name",
+              "dataType": {
+                "type": "string"
+              }
+            },
+            {
+              "columnName": "age",
+              "dataType": {
+                "type": "long"
+              }
+            },
+            {
+              "columnName": "some_array",
+              "dataType": {
+                "type": "array",
+                "items": "int"
+              }
+            }
+          ]
+        }
+      }
+    ],
+    "expectedRecord": "some_record ; name:me ; age:22 ; some_array ; item:3 ; 
item:4 ; item:5 ;",
+    "expectedSchema": "message test_table {  ; required group some_record {  ; 
required binary name (UTF8) ;  ; required int64 age ;  ; required group 
some_array {  ; repeated int32 item ; ; } ; } ;  } ; "
+  },
+  "map": {
+    "schema": [
+      {
+        "columnName": "cityToCountry",
+        "dataType": {
+          "type": "map",
+          "values": "string"
+        }
+      }
+    ],
+    "record": {
+      "cityToCountry": {
+        "ny": "US",
+        "london": "UK",
+        "delhi": "India"
+      }
+    },
+    "expectedRecord": "cityToCountry; map; key:ny;value:US; map; 
key:london;value:UK; map; key:delhi;value:India;",
+    "expectedSchema": "message test_table {  ;   required groupcityToCountry { 
 ;  repeated group map {  ;   required binary key (UTF8) ;   ;   required 
binary value (UTF8) ;   ;  } ;  } ;  } ;"
+  },
+  "nullValueInOptionalField": {
+    "record": {
+      "a": null
+    },
+    "schema": [
+      {
+        "columnName": "a",
+        "isNullable": true,
+        "dataType": {
+          "type": "int"
+        }
+      }
+    ],
+    "expectedRecord": "",
+    "expectedSchema": "message test_table {; optional int32 a ;; };"
+  }
+}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-parquet/build.gradle 
b/gobblin-modules/gobblin-parquet-common/build.gradle
similarity index 97%
copy from gobblin-modules/gobblin-parquet/build.gradle
copy to gobblin-modules/gobblin-parquet-common/build.gradle
index 75530b9..055eef2 100644
--- a/gobblin-modules/gobblin-parquet/build.gradle
+++ b/gobblin-modules/gobblin-parquet-common/build.gradle
@@ -21,7 +21,6 @@ dependencies {
   compile project(":gobblin-core")
 
   compile externalDependency.gson
-  compile externalDependency.parquet
 
   testCompile externalDependency.testng
   testCompile externalDependency.mockito
diff --git 
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonSchema.java
 
b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/converter/parquet/JsonSchema.java
similarity index 94%
rename from 
gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonSchema.java
rename to 
gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/converter/parquet/JsonSchema.java
index 626551d..55c3a52 100644
--- 
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonSchema.java
+++ 
b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/converter/parquet/JsonSchema.java
@@ -16,19 +16,15 @@
  */
 package org.apache.gobblin.converter.parquet;
 
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.source.extractor.schema.Schema;
-
 import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonPrimitive;
 
-import parquet.schema.Type.Repetition;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.source.extractor.schema.Schema;
 
 import static org.apache.gobblin.converter.parquet.JsonSchema.InputType.ENUM;
 import static org.apache.gobblin.converter.parquet.JsonSchema.InputType.RECORD;
-import static parquet.schema.Type.Repetition.OPTIONAL;
-import static parquet.schema.Type.Repetition.REQUIRED;
 
 
 /**
@@ -133,12 +129,10 @@ public class JsonSchema extends Schema {
   }
 
   /**
-   * Parquet {@link Repetition} for this {@link JsonSchema}.
+   * Parquet {@link RepetitionType} for this {@link JsonSchema}.
    * @return
    */
-  public Repetition optionalOrRequired() {
-    return this.isNullable() ? OPTIONAL : REQUIRED;
-  }
+  //public abstract RepetitionType optionalOrRequired();
 
   /**
    * Set properties for {@link JsonSchema} from a {@link JsonObject}.
diff --git a/gobblin-modules/gobblin-parquet/build.gradle 
b/gobblin-modules/gobblin-parquet/build.gradle
index 75530b9..fb56bb1 100644
--- a/gobblin-modules/gobblin-parquet/build.gradle
+++ b/gobblin-modules/gobblin-parquet/build.gradle
@@ -19,9 +19,10 @@ apply plugin: 'java'
 
 dependencies {
   compile project(":gobblin-core")
+  compile project(":gobblin-modules:gobblin-parquet-common")
 
   compile externalDependency.gson
-  compile externalDependency.parquet
+  compile externalDependency.twitterParquet
 
   testCompile externalDependency.testng
   testCompile externalDependency.mockito
diff --git 
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java
 
b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java
index 288a5de..44cb31e 100644
--- 
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java
+++ 
b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java
@@ -22,8 +22,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.gobblin.converter.parquet.JsonSchema.*;
-
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
@@ -43,15 +41,18 @@ import parquet.schema.PrimitiveType.PrimitiveTypeName;
 import parquet.schema.Type;
 import parquet.schema.Types;
 
-import static 
org.apache.gobblin.converter.parquet.JsonElementConversionFactory.RecordConverter.RecordType.CHILD;
+import org.apache.gobblin.converter.parquet.JsonSchema.*;
+
 import static org.apache.gobblin.converter.parquet.JsonSchema.*;
 import static org.apache.gobblin.converter.parquet.JsonSchema.InputType.STRING;
+import static 
org.apache.gobblin.converter.parquet.JsonElementConversionFactory.RecordConverter.RecordType.CHILD;
 import static parquet.schema.OriginalType.UTF8;
 import static parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
 import static parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
 import static parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
 import static parquet.schema.Type.Repetition.OPTIONAL;
 import static parquet.schema.Type.Repetition.REPEATED;
+import static parquet.schema.Type.Repetition.REQUIRED;
 
 
 /**
@@ -177,7 +178,7 @@ public class JsonElementConversionFactory {
     }
 
     protected Type buildSchema() {
-      return new PrimitiveType(this.repeated ? REPEATED : 
this.jsonSchema.optionalOrRequired(), this.outputType,
+      return new PrimitiveType(this.repeated ? REPEATED : 
optionalOrRequired(this.jsonSchema), this.outputType,
           this.jsonSchema.getColumnName());
     }
 
@@ -294,7 +295,7 @@ public class JsonElementConversionFactory {
       if (this.repeated) {
         return Types.repeated(BINARY).as(UTF8).named(columnName);
       }
-      switch (this.jsonSchema.optionalOrRequired()) {
+      switch (optionalOrRequired(this.jsonSchema)) {
         case OPTIONAL:
           return Types.optional(BINARY).as(UTF8).named(columnName);
         case REQUIRED:
@@ -305,6 +306,10 @@ public class JsonElementConversionFactory {
     }
   }
 
+  public static Type.Repetition optionalOrRequired(JsonSchema jsonBaseSchema) {
+    return jsonBaseSchema.isNullable() ? OPTIONAL : REQUIRED;
+  }
+
   public static class ArrayConverter extends CollectionConverter {
 
     public ArrayConverter(JsonSchema arraySchema) {
@@ -325,7 +330,7 @@ public class JsonElementConversionFactory {
     protected Type buildSchema() {
       List<Type> fields = new ArrayList<>();
       fields.add(0, this.elementConverter.schema());
-      return new GroupType(this.jsonSchema.optionalOrRequired(), 
this.jsonSchema.getColumnName(), fields);
+      return new GroupType(optionalOrRequired(jsonSchema), 
this.jsonSchema.getColumnName(), fields);
     }
 
     @Override
@@ -396,7 +401,7 @@ public class JsonElementConversionFactory {
         JsonElementConverter converter = this.converters.get(key);
         Object convertedValue = converter.convert(entry.getValue());
         boolean valueIsNull = convertedValue == null;
-        Type.Repetition repetition = converter.jsonSchema.optionalOrRequired();
+        Type.Repetition repetition = optionalOrRequired(converter.jsonSchema);
         if (valueIsNull && repetition.equals(OPTIONAL)) {
           continue;
         }
@@ -422,7 +427,7 @@ public class JsonElementConversionFactory {
         case ROOT:
           return new MessageType(docName, parquetTypes);
         case CHILD:
-          return new GroupType(this.jsonSchema.optionalOrRequired(), docName, 
parquetTypes);
+          return new GroupType(optionalOrRequired(this.jsonSchema), docName, 
parquetTypes);
         default:
           throw new RuntimeException("Unsupported Record type");
       }
@@ -463,7 +468,7 @@ public class JsonElementConversionFactory {
           Types.repeatedGroup().addFields(keyConverter.schema(), 
elementConverter.schema()).named(MAP_KEY)
               .asGroupType();
       String columnName = this.jsonSchema.getColumnName();
-      switch (this.jsonSchema.optionalOrRequired()) {
+      switch (optionalOrRequired(this.jsonSchema)) {
         case OPTIONAL:
           return 
Types.optionalGroup().addFields(mapGroup).named(columnName).asGroupType();
         case REQUIRED:
diff --git 
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverter.java
 
b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverter.java
index b04dcf8..328d86d 100644
--- 
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverter.java
+++ 
b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverter.java
@@ -16,6 +16,12 @@
  */
 package org.apache.gobblin.converter.parquet;
 
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+
+import parquet.example.data.Group;
+import parquet.schema.MessageType;
+
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.converter.Converter;
 import org.apache.gobblin.converter.DataConversionException;
@@ -23,12 +29,6 @@ import 
org.apache.gobblin.converter.SchemaConversionException;
 import org.apache.gobblin.converter.SingleRecordIterable;
 import 
org.apache.gobblin.converter.parquet.JsonElementConversionFactory.RecordConverter;
 
-import com.google.gson.JsonArray;
-import com.google.gson.JsonObject;
-
-import parquet.example.data.Group;
-import parquet.schema.MessageType;
-
 import static 
org.apache.gobblin.converter.parquet.JsonElementConversionFactory.RecordConverter.RecordType.ROOT;
 
 
diff --git 
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/ParquetGroup.java
 
b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/ParquetGroup.java
index 783d845..f2e0a99 100644
--- 
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/ParquetGroup.java
+++ 
b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/ParquetGroup.java
@@ -83,6 +83,7 @@ public class ParquetGroup extends Group {
     return result.toString();
   }
 
+  @Override
   public Group addGroup(int fieldIndex) {
     ParquetGroup g = new 
ParquetGroup(this.schema.getType(fieldIndex).asGroupType());
     this.data[fieldIndex].add(g);
diff --git 
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
 
b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
index 7ce2020..4a47792 100644
--- 
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
+++ 
b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
@@ -19,8 +19,6 @@ package org.apache.gobblin.writer;
 import java.io.IOException;
 import java.util.Optional;
 
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.util.ForkOperatorUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
@@ -34,6 +32,9 @@ import parquet.hadoop.example.GroupWriteSupport;
 import parquet.hadoop.metadata.CompressionCodecName;
 import parquet.schema.MessageType;
 
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.ForkOperatorUtils;
+
 import static org.apache.gobblin.configuration.ConfigurationKeys.LOCAL_FS_URI;
 import static 
org.apache.gobblin.configuration.ConfigurationKeys.WRITER_CODEC_TYPE;
 import static 
org.apache.gobblin.configuration.ConfigurationKeys.WRITER_FILE_SYSTEM_URI;
diff --git 
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java
 
b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java
index a775bc2..744c784 100644
--- 
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java
+++ 
b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java
@@ -19,12 +19,12 @@ package org.apache.gobblin.writer;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.State;
-
 import parquet.example.data.Group;
 import parquet.hadoop.ParquetWriter;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+
 
 /**
  * An extension to {@link FsDataWriter} that writes in Parquet format in the 
form of {@link Group}s.
diff --git 
a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverterTest.java
 
b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverterTest.java
index da9a7ce..fd828fa 100644
--- 
a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverterTest.java
+++ 
b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverterTest.java
@@ -19,11 +19,6 @@ package org.apache.gobblin.converter.parquet;
 import java.io.InputStreamReader;
 import java.lang.reflect.Type;
 
-import org.apache.gobblin.configuration.SourceState;
-import org.apache.gobblin.configuration.WorkUnitState;
-import org.apache.gobblin.converter.DataConversionException;
-import org.apache.gobblin.converter.SchemaConversionException;
-import org.apache.gobblin.source.workunit.Extract;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
@@ -34,6 +29,12 @@ import com.google.gson.reflect.TypeToken;
 import parquet.example.data.Group;
 import parquet.schema.MessageType;
 
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.converter.DataConversionException;
+import org.apache.gobblin.converter.SchemaConversionException;
+import org.apache.gobblin.source.workunit.Extract;
+
 import static org.testng.Assert.assertEquals;
 
 
diff --git 
a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
 
b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
index 40c638c..086b084 100644
--- 
a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
+++ 
b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
@@ -22,8 +22,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.State;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
@@ -40,6 +38,9 @@ import parquet.hadoop.api.ReadSupport;
 import parquet.io.api.RecordMaterializer;
 import parquet.schema.MessageType;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+
 import static 
org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_DICTIONARY;
 import static 
org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_DICTIONARY_PAGE_SIZE;
 import static 
org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_PAGE_SIZE;
@@ -91,6 +92,7 @@ public class ParquetHdfsDataWriterTest {
     properties.setProp(WRITER_PARQUET_DICTIONARY_PAGE_SIZE, 1024);
     properties.setProp(WRITER_PARQUET_PAGE_SIZE, 1024);
     properties.setProp(WRITER_PARQUET_VALIDATE, true);
+    properties.setProp(ConfigurationKeys.WRITER_CODEC_TYPE, "gzip");
     return properties;
   }
 
diff --git 
a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java
 
b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java
index 6144aaf..e5bf215 100644
--- 
a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java
+++ 
b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java
@@ -37,7 +37,7 @@ public class TestConstants {
 
   public static final String TEST_FS_URI = "file:///";
 
-  public static final String TEST_ROOT_DIR = 
System.getProperty("java.io.tmpdir");
+  public static final String TEST_ROOT_DIR = 
System.getProperty("java.io.tmpdir") + "/" + System.currentTimeMillis();
 
   public static final String TEST_STAGING_DIR = TEST_ROOT_DIR + "/staging";
 
diff --git a/gobblin-test-harness/build.gradle 
b/gobblin-test-harness/build.gradle
index e231ced..48172d2 100644
--- a/gobblin-test-harness/build.gradle
+++ b/gobblin-test-harness/build.gradle
@@ -28,6 +28,7 @@ dependencies {
   testCompile externalDependency.calciteAvatica
   testCompile externalDependency.jhyde
   testCompile externalDependency.testng
+  testCompile externalDependency.twitterParquet
 }
 
 configurations { compile { transitive = false } }
diff --git 
a/gobblin-test-harness/src/test/java/org/apache/gobblin/GobblinLocalJobLauncherUtils.java
 
b/gobblin-test-harness/src/test/java/org/apache/gobblin/GobblinLocalJobLauncherUtils.java
index 8bbabb7..d306ed8 100644
--- 
a/gobblin-test-harness/src/test/java/org/apache/gobblin/GobblinLocalJobLauncherUtils.java
+++ 
b/gobblin-test-harness/src/test/java/org/apache/gobblin/GobblinLocalJobLauncherUtils.java
@@ -23,13 +23,15 @@ import java.util.UUID;
 
 import org.apache.commons.io.FileUtils;
 
+import com.google.common.io.Files;
+
 import org.apache.gobblin.runtime.app.ApplicationLauncher;
 import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
 import org.apache.gobblin.runtime.local.LocalJobLauncher;
 
 
 public class GobblinLocalJobLauncherUtils {
-  public static final String RESOURCE_DIR = 
"./gobblin-test-harness/src/test/resources/runtime_test/";
+  public static final String RESOURCE_DIR = 
Files.createTempDir().getAbsolutePath()+"/";
   public static final String SAMPLE_DIR = "test_data/daily/2016/10/01/";
   public static final String DATA_PURGER_COMMIT_DATA = 
"data.purger.commit.data";
   public static final String STATE_STORE = "state_store";
diff --git 
a/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskSkipErrRecordsIntegrationTest.java
 
b/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskSkipErrRecordsIntegrationTest.java
index 32b1836..a2044b6 100644
--- 
a/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskSkipErrRecordsIntegrationTest.java
+++ 
b/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskSkipErrRecordsIntegrationTest.java
@@ -18,15 +18,18 @@ package org.apache.gobblin;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URL;
 import java.util.Properties;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.JobException;
+import org.testng.Assert;
 import org.testng.annotations.AfterTest;
 import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.JobException;
+
 
 @Test
 public class TaskSkipErrRecordsIntegrationTest {
@@ -70,7 +73,10 @@ public class TaskSkipErrRecordsIntegrationTest {
       throws IOException {
     Properties jobProperties =
         
GobblinLocalJobLauncherUtils.getJobProperties("runtime_test/task_skip_err_records.properties");
-    FileUtils.copyFile(new File(GobblinLocalJobLauncherUtils.RESOURCE_DIR + 
SAMPLE_FILE),
+    URL resource = getClass().getClassLoader().getResource("runtime_test/" + 
SAMPLE_FILE);
+    Assert.assertNotNull(resource, "Sample file not found");
+    File sampleFile = new File(resource.getFile());
+    FileUtils.copyFile(sampleFile,
         new File(GobblinLocalJobLauncherUtils.RESOURCE_DIR + 
GobblinLocalJobLauncherUtils.SAMPLE_DIR + SAMPLE_FILE));
     jobProperties.setProperty(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL,
         GobblinLocalJobLauncherUtils.RESOURCE_DIR + 
GobblinLocalJobLauncherUtils.SAMPLE_DIR + SAMPLE_FILE);
diff --git 
a/gobblin-test-harness/src/test/java/org/apache/gobblin/WriterOutputFormatIntegrationTest.java
 
b/gobblin-test-harness/src/test/java/org/apache/gobblin/WriterOutputFormatIntegrationTest.java
index 54a78aa..bfb8a5d 100644
--- 
a/gobblin-test-harness/src/test/java/org/apache/gobblin/WriterOutputFormatIntegrationTest.java
+++ 
b/gobblin-test-harness/src/test/java/org/apache/gobblin/WriterOutputFormatIntegrationTest.java
@@ -18,13 +18,17 @@ package org.apache.gobblin;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URL;
 import java.util.Properties;
 
 import org.apache.commons.io.FileUtils;
+import org.testng.Assert;
 import org.testng.annotations.AfterTest;
 import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
 
+import com.google.common.io.Files;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.hive.HiveSerDeWrapper;
 
@@ -70,10 +74,19 @@ public class WriterOutputFormatIntegrationTest {
       throws IOException {
     Properties jobProperties =
         
GobblinLocalJobLauncherUtils.getJobProperties("runtime_test/writer_output_format_test.properties");
-    FileUtils.copyFile(new File(GobblinLocalJobLauncherUtils.RESOURCE_DIR + 
SAMPLE_FILE),
-        new File(GobblinLocalJobLauncherUtils.RESOURCE_DIR + 
GobblinLocalJobLauncherUtils.SAMPLE_DIR + SAMPLE_FILE));
+    URL resource = getClass().getClassLoader().getResource("runtime_test/" + 
SAMPLE_FILE);
+    Assert.assertNotNull(resource, "Sample file should be present");
+    File sampleFile = new File(resource.getFile());
+    File testFile = File.createTempFile("writerTest", ".avro");
+    FileUtils.copyFile(sampleFile, testFile);
     jobProperties.setProperty(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL,
-        GobblinLocalJobLauncherUtils.RESOURCE_DIR + 
GobblinLocalJobLauncherUtils.SAMPLE_DIR + SAMPLE_FILE);
+        testFile.getAbsolutePath());
+
+    String outputRootDirectory = Files.createTempDir().getAbsolutePath() + "/";
+    jobProperties.setProperty(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, 
outputRootDirectory + "state_store");
+    jobProperties.setProperty(ConfigurationKeys.WRITER_STAGING_DIR, 
outputRootDirectory + "writer_staging");
+    jobProperties.setProperty(ConfigurationKeys.WRITER_OUTPUT_DIR, 
outputRootDirectory + "writer_output");
+    jobProperties.setProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, 
outputRootDirectory + "final_dir");
     return jobProperties;
   }
 }
diff --git 
a/gobblin-test-harness/src/test/resources/runtime_test/writer_output_format_test.properties
 
b/gobblin-test-harness/src/test/resources/runtime_test/writer_output_format_test.properties
index 1c0ac20..9dca5e3 100644
--- 
a/gobblin-test-harness/src/test/resources/runtime_test/writer_output_format_test.properties
+++ 
b/gobblin-test-harness/src/test/resources/runtime_test/writer_output_format_test.properties
@@ -28,15 +28,10 @@ writer.destination.type=HDFS
 writer.eager.initialization=true
 writer.file.path=output
 
-state.store.dir=./gobblin-test-harness/src/test/resources/runtime_test/state_store
-writer.staging.dir=./gobblin-test-harness/src/test/resources/runtime_test/writer_staging
-writer.output.dir=./gobblin-test-harness/src/test/resources/runtime_test/writer_output
-data.publisher.final.dir=./gobblin-test-harness/src/test/resources/runtime_test/final_dir
-
 
converter.classes=org.apache.gobblin.converter.avro.AvroRecordToAvroWritableConverter,org.apache.gobblin.converter.serde.HiveSerDeConverter
 serde.deserializer.type=AVRO
 
writer.builder.class=org.apache.gobblin.writer.HiveWritableHdfsDataWriterBuilder
 fork.record.queue.capacity=1
 
avro.schema.literal={"type":"record","name":"User","namespace":"example.avro","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}
 
-source.class=org.apache.gobblin.TestAvroSource
\ No newline at end of file
+source.class=org.apache.gobblin.TestAvroSource
diff --git a/gradle/scripts/dependencyDefinitions.gradle 
b/gradle/scripts/dependencyDefinitions.gradle
index 8ef4b3a..f2836f7 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -168,7 +168,8 @@ ext.externalDependency = [
     "grok": "io.thekraken:grok:0.1.5",
     "hadoopAdl" : "org.apache.hadoop:hadoop-azure-datalake:3.0.0-alpha2",
     "orcMapreduce":"org.apache.orc:orc-mapreduce:1.6.1",
-    'parquet': 'com.twitter:parquet-hadoop-bundle:1.5.0',
+    'parquet': 'org.apache.parquet:parquet-hadoop:1.10.1',
+    'twitterParquet': 'com.twitter:parquet-hadoop-bundle:1.5.0',
     'reactivex': 'io.reactivex.rxjava2:rxjava:2.1.0',
     "slf4j": [
         "org.slf4j:slf4j-api:" + slf4jVersion,

Reply via email to