This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4483268 [GOBBLIN-1008] Upgrading parquet dependency to
org.apache.parquet. Fixing tests
4483268 is described below
commit 4483268e3885adb63da7253df5ab0c00eb86aece
Author: Shirshanka Das <[email protected]>
AuthorDate: Tue Dec 24 21:15:39 2019 -0800
[GOBBLIN-1008] Upgrading parquet dependency to org.apache.parquet. Fixing
tests
Closes #2853 from shirshanka/parquet-upgrade
---
.../finder/HdfsModifiedTimeHiveVersionFinder.java | 4 +-
gobblin-docs/sinks/ParquetHdfsDataWriter.md | 26 ++-
.../build.gradle | 1 +
.../parquet/JsonElementConversionFactory.java | 63 ++---
.../JsonIntermediateToParquetGroupConverter.java | 11 +-
.../gobblin/converter/parquet/ParquetGroup.java | 56 +++--
.../gobblin/writer/ParquetDataWriterBuilder.java | 24 +-
.../gobblin/writer/ParquetHdfsDataWriter.java | 6 +-
...sonIntermediateToParquetGroupConverterTest.java | 14 +-
.../gobblin/writer/ParquetHdfsDataWriterTest.java | 20 +-
.../org/apache/gobblin/writer/TestConstants.java | 14 +-
.../JsonIntermediateToParquetConverter.json | 259 +++++++++++++++++++++
.../build.gradle | 1 -
.../gobblin/converter/parquet/JsonSchema.java | 14 +-
gobblin-modules/gobblin-parquet/build.gradle | 3 +-
.../parquet/JsonElementConversionFactory.java | 23 +-
.../JsonIntermediateToParquetGroupConverter.java | 12 +-
.../gobblin/converter/parquet/ParquetGroup.java | 1 +
.../gobblin/writer/ParquetDataWriterBuilder.java | 5 +-
.../gobblin/writer/ParquetHdfsDataWriter.java | 6 +-
...sonIntermediateToParquetGroupConverterTest.java | 11 +-
.../gobblin/writer/ParquetHdfsDataWriterTest.java | 6 +-
.../org/apache/gobblin/writer/TestConstants.java | 2 +-
gobblin-test-harness/build.gradle | 1 +
.../gobblin/GobblinLocalJobLauncherUtils.java | 4 +-
.../gobblin/TaskSkipErrRecordsIntegrationTest.java | 12 +-
.../gobblin/WriterOutputFormatIntegrationTest.java | 19 +-
.../writer_output_format_test.properties | 7 +-
gradle/scripts/dependencyDefinitions.gradle | 3 +-
29 files changed, 475 insertions(+), 153 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/HdfsModifiedTimeHiveVersionFinder.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/HdfsModifiedTimeHiveVersionFinder.java
index 8a7cb87..c427962 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/HdfsModifiedTimeHiveVersionFinder.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/HdfsModifiedTimeHiveVersionFinder.java
@@ -18,15 +18,15 @@ package org.apache.gobblin.data.management.version.finder;
import java.io.IOException;
-import
org.apache.gobblin.data.management.version.TimestampedHiveDatasetVersion;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.joda.time.DateTime;
+import com.google.common.base.Preconditions;
import com.typesafe.config.Config;
-import parquet.Preconditions;
+import
org.apache.gobblin.data.management.version.TimestampedHiveDatasetVersion;
/**
diff --git a/gobblin-docs/sinks/ParquetHdfsDataWriter.md
b/gobblin-docs/sinks/ParquetHdfsDataWriter.md
index f3ad0da..a7a24ea 100644
--- a/gobblin-docs/sinks/ParquetHdfsDataWriter.md
+++ b/gobblin-docs/sinks/ParquetHdfsDataWriter.md
@@ -1,6 +1,6 @@
# Description
-An extension to
[`FsDataWriter`](https://github.com/apache/incubator-gobblin/blob/master/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java)
that writes in Parquet format in the form of
[`Group.java`](https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/example/data/Group.java).
This implementation allows users to specify the CodecFactory to use through
the configuration property [`writer.codec.type`](https://gobblin.readthe [...]
+An extension to
[`FsDataWriter`](https://github.com/apache/incubator-gobblin/blob/master/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java)
that writes in Parquet format in the form of
[`Group.java`](https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/example/data/Group.java).
This implementation allows users to specify the CodecFactory to use through
the configuration property [`writer.codec.type`](https://gobblin.readthe [...]
# Usage
```
@@ -8,10 +8,6 @@
writer.builder.class=org.apache.gobblin.writer.ParquetDataWriterBuilder
writer.destination.type=HDFS
writer.output.format=PARQUET
```
-For more info, see
-[`ParquetHdfsDataWriter`](https://github.com/apache/incubator-gobblin/blob/master/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java)
-and
-[`ParquetDataWriterBuilder`](https://github.com/apache/incubator-gobblin/blob/master/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java)
# Configuration
@@ -22,4 +18,22 @@ and
| writer.parquet.dictionary.page.size | The block size threshold for the
dictionary pages. | 134217728 | No |
| writer.parquet.dictionary | To turn dictionary encoding on. Parquet has a
dictionary encoding for data with a small number of unique values ( < 10^5 )
that aids in significant compression and boosts processing speed. | true | No |
| writer.parquet.validate | To turn on validation using the schema. This
validation is done by
[`ParquetWriter`](https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java)
not by Gobblin. | false | No |
-| writer.parquet.version | Version of parquet writer to use. Available
versions are v1 and v2. | v1 | No |
\ No newline at end of file
+| writer.parquet.version | Version of parquet writer to use. Available
versions are v1 and v2. | v1 | No |
+
+# Developer Notes
+
+Gobblin provides integration with two different versions of Parquet through
its modules. Use the appropriate jar based on the Parquet library you use in
your code.
+
+| Jar | Dependency | Gobblin Release |
+|-----|-------------|--------|
+|
[`gobblin-parquet`](https://mvnrepository.com/artifact/org.apache.gobblin/gobblin-parquet)
|
[`com.twitter:parquet-hadoop-bundle`](https://mvnrepository.com/artifact/com.twitter/parquet-hadoop-bundle)
| >= 0.12.0 |
+|
[`gobblin-parquet-apache`](https://mvnrepository.com/artifact/org.apache.gobblin/gobblin-parquet-apache)
|
[`org.apache.parquet:parquet-hadoop`](https://mvnrepository.com/artifact/org.apache.parquet/parquet-hadoop)
| >= 0.15.0 |
+
+If you want to look at the code, check out:
+
+| Module | File |
+| ------ | ---- |
+| gobblin-parquet |
[`ParquetHdfsDataWriter`](https://github.com/apache/incubator-gobblin/blob/master/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java)
|
+| gobblin-parquet |
[`ParquetDataWriterBuilder`](https://github.com/apache/incubator-gobblin/blob/master/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java)
|
+| gobblin-parquet-apache |
[`ParquetHdfsDataWriter`](https://github.com/apache/incubator-gobblin/blob/master/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java)
|
+| gobblin-parquet-apache |
[`ParquetDataWriterBuilder`](https://github.com/apache/incubator-gobblin/blob/master/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java)
|
diff --git a/gobblin-modules/gobblin-parquet/build.gradle
b/gobblin-modules/gobblin-parquet-apache/build.gradle
similarity index 96%
copy from gobblin-modules/gobblin-parquet/build.gradle
copy to gobblin-modules/gobblin-parquet-apache/build.gradle
index 75530b9..560638d 100644
--- a/gobblin-modules/gobblin-parquet/build.gradle
+++ b/gobblin-modules/gobblin-parquet-apache/build.gradle
@@ -19,6 +19,7 @@ apply plugin: 'java'
dependencies {
compile project(":gobblin-core")
+ compile project(":gobblin-modules:gobblin-parquet-common")
compile externalDependency.gson
compile externalDependency.parquet
diff --git
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java
b/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java
similarity index 88%
copy from
gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java
copy to
gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java
index 288a5de..822eb65 100644
---
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java
+++
b/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java
@@ -14,44 +14,45 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gobblin.converter.parquet;
+package org.apache.gobblin.converter.parquet;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import org.apache.gobblin.converter.parquet.JsonSchema.*;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.BinaryValue;
+import org.apache.parquet.example.data.simple.BooleanValue;
+import org.apache.parquet.example.data.simple.DoubleValue;
+import org.apache.parquet.example.data.simple.FloatValue;
+import org.apache.parquet.example.data.simple.IntegerValue;
+import org.apache.parquet.example.data.simple.LongValue;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
-import parquet.example.data.Group;
-import parquet.example.data.simple.BinaryValue;
-import parquet.example.data.simple.BooleanValue;
-import parquet.example.data.simple.DoubleValue;
-import parquet.example.data.simple.FloatValue;
-import parquet.example.data.simple.IntegerValue;
-import parquet.example.data.simple.LongValue;
-import parquet.io.api.Binary;
-import parquet.schema.GroupType;
-import parquet.schema.MessageType;
-import parquet.schema.PrimitiveType;
-import parquet.schema.PrimitiveType.PrimitiveTypeName;
-import parquet.schema.Type;
-import parquet.schema.Types;
+import org.apache.gobblin.converter.parquet.JsonSchema.*;
-import static
org.apache.gobblin.converter.parquet.JsonElementConversionFactory.RecordConverter.RecordType.CHILD;
import static org.apache.gobblin.converter.parquet.JsonSchema.*;
import static org.apache.gobblin.converter.parquet.JsonSchema.InputType.STRING;
-import static parquet.schema.OriginalType.UTF8;
-import static parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
-import static parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
-import static parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
-import static parquet.schema.Type.Repetition.OPTIONAL;
-import static parquet.schema.Type.Repetition.REPEATED;
+import static
org.apache.gobblin.converter.parquet.JsonElementConversionFactory.RecordConverter.RecordType.CHILD;
+import static org.apache.parquet.schema.OriginalType.UTF8;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
+import static org.apache.parquet.schema.Type.Repetition.REPEATED;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
/**
@@ -177,7 +178,7 @@ public class JsonElementConversionFactory {
}
protected Type buildSchema() {
- return new PrimitiveType(this.repeated ? REPEATED :
this.jsonSchema.optionalOrRequired(), this.outputType,
+ return new PrimitiveType(this.repeated ? REPEATED :
optionalOrRequired(this.jsonSchema), this.outputType,
this.jsonSchema.getColumnName());
}
@@ -294,7 +295,7 @@ public class JsonElementConversionFactory {
if (this.repeated) {
return Types.repeated(BINARY).as(UTF8).named(columnName);
}
- switch (this.jsonSchema.optionalOrRequired()) {
+ switch (optionalOrRequired(this.jsonSchema)) {
case OPTIONAL:
return Types.optional(BINARY).as(UTF8).named(columnName);
case REQUIRED:
@@ -305,6 +306,10 @@ public class JsonElementConversionFactory {
}
}
+ public static Type.Repetition optionalOrRequired(JsonSchema jsonBaseSchema) {
+ return jsonBaseSchema.isNullable() ? OPTIONAL : REQUIRED;
+ }
+
public static class ArrayConverter extends CollectionConverter {
public ArrayConverter(JsonSchema arraySchema) {
@@ -325,7 +330,7 @@ public class JsonElementConversionFactory {
protected Type buildSchema() {
List<Type> fields = new ArrayList<>();
fields.add(0, this.elementConverter.schema());
- return new GroupType(this.jsonSchema.optionalOrRequired(),
this.jsonSchema.getColumnName(), fields);
+ return new GroupType(optionalOrRequired(jsonSchema),
this.jsonSchema.getColumnName(), fields);
}
@Override
@@ -396,7 +401,7 @@ public class JsonElementConversionFactory {
JsonElementConverter converter = this.converters.get(key);
Object convertedValue = converter.convert(entry.getValue());
boolean valueIsNull = convertedValue == null;
- Type.Repetition repetition = converter.jsonSchema.optionalOrRequired();
+ Type.Repetition repetition = optionalOrRequired(converter.jsonSchema);
if (valueIsNull && repetition.equals(OPTIONAL)) {
continue;
}
@@ -422,7 +427,7 @@ public class JsonElementConversionFactory {
case ROOT:
return new MessageType(docName, parquetTypes);
case CHILD:
- return new GroupType(this.jsonSchema.optionalOrRequired(), docName,
parquetTypes);
+ return new GroupType(optionalOrRequired(this.jsonSchema), docName,
parquetTypes);
default:
throw new RuntimeException("Unsupported Record type");
}
@@ -463,7 +468,7 @@ public class JsonElementConversionFactory {
Types.repeatedGroup().addFields(keyConverter.schema(),
elementConverter.schema()).named(MAP_KEY)
.asGroupType();
String columnName = this.jsonSchema.getColumnName();
- switch (this.jsonSchema.optionalOrRequired()) {
+ switch (optionalOrRequired(this.jsonSchema)) {
case OPTIONAL:
return
Types.optionalGroup().addFields(mapGroup).named(columnName).asGroupType();
case REQUIRED:
diff --git
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverter.java
b/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverter.java
similarity index 96%
copy from
gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverter.java
copy to
gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverter.java
index b04dcf8..1eda5b7 100644
---
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverter.java
+++
b/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverter.java
@@ -15,6 +15,11 @@
* limitations under the License.
*/
package org.apache.gobblin.converter.parquet;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.schema.MessageType;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.converter.Converter;
@@ -23,12 +28,6 @@ import
org.apache.gobblin.converter.SchemaConversionException;
import org.apache.gobblin.converter.SingleRecordIterable;
import
org.apache.gobblin.converter.parquet.JsonElementConversionFactory.RecordConverter;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonObject;
-
-import parquet.example.data.Group;
-import parquet.schema.MessageType;
-
import static
org.apache.gobblin.converter.parquet.JsonElementConversionFactory.RecordConverter.RecordType.ROOT;
diff --git
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/ParquetGroup.java
b/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/converter/parquet/ParquetGroup.java
similarity index 82%
copy from
gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/ParquetGroup.java
copy to
gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/converter/parquet/ParquetGroup.java
index 783d845..0b7e409 100644
---
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/ParquetGroup.java
+++
b/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/converter/parquet/ParquetGroup.java
@@ -15,27 +15,26 @@
* limitations under the License.
*/
package org.apache.gobblin.converter.parquet;
-
import java.util.ArrayList;
import java.util.List;
-import parquet.example.data.Group;
-import parquet.example.data.simple.BinaryValue;
-import parquet.example.data.simple.BooleanValue;
-import parquet.example.data.simple.DoubleValue;
-import parquet.example.data.simple.FloatValue;
-import parquet.example.data.simple.Int96Value;
-import parquet.example.data.simple.IntegerValue;
-import parquet.example.data.simple.LongValue;
-import parquet.example.data.simple.NanoTime;
-import parquet.example.data.simple.Primitive;
-import parquet.io.api.Binary;
-import parquet.io.api.RecordConsumer;
-import parquet.schema.GroupType;
-import parquet.schema.PrimitiveType;
-import parquet.schema.Type;
-
-import static parquet.schema.Type.Repetition.REPEATED;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.BinaryValue;
+import org.apache.parquet.example.data.simple.BooleanValue;
+import org.apache.parquet.example.data.simple.DoubleValue;
+import org.apache.parquet.example.data.simple.FloatValue;
+import org.apache.parquet.example.data.simple.Int96Value;
+import org.apache.parquet.example.data.simple.IntegerValue;
+import org.apache.parquet.example.data.simple.LongValue;
+import org.apache.parquet.example.data.simple.NanoTime;
+import org.apache.parquet.example.data.simple.Primitive;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+
+import static org.apache.parquet.schema.Type.Repetition.REPEATED;
/**
@@ -83,6 +82,7 @@ public class ParquetGroup extends Group {
return result.toString();
}
+ @Override
public Group addGroup(int fieldIndex) {
ParquetGroup g = new
ParquetGroup(this.schema.getType(fieldIndex).asGroupType());
this.data[fieldIndex].add(g);
@@ -139,6 +139,21 @@ public class ParquetGroup extends Group {
return ((IntegerValue) this.getValue(fieldIndex, index)).getInteger();
}
+ @Override
+ public long getLong(int fieldIndex, int index) {
+ return ((LongValue) this.getValue(fieldIndex, index)).getLong();
+ }
+
+ @Override
+ public double getDouble(int fieldIndex, int index) {
+ return ((DoubleValue) this.getValue(fieldIndex, index)).getDouble();
+ }
+
+ @Override
+ public float getFloat(int fieldIndex, int index) {
+ return ((FloatValue) this.getValue(fieldIndex, index)).getFloat();
+ }
+
public boolean getBoolean(int fieldIndex, int index) {
return ((BooleanValue) this.getValue(fieldIndex, index)).getBoolean();
}
@@ -193,6 +208,11 @@ public class ParquetGroup extends Group {
this.add(fieldIndex, new DoubleValue(value));
}
+ @Override
+ public void add(int fieldIndex, Group value) {
+ this.data[fieldIndex].add(value);
+ }
+
public GroupType getType() {
return this.schema;
}
diff --git
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
b/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
similarity index 88%
copy from
gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
copy to
gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
index 7ce2020..ac4e2d4 100644
---
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
+++
b/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
@@ -19,29 +19,29 @@ package org.apache.gobblin.writer;
import java.io.IOException;
import java.util.Optional;
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.util.ForkOperatorUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.MessageType;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
-import parquet.column.ParquetProperties;
-import parquet.example.data.Group;
-import parquet.hadoop.ParquetWriter;
-import parquet.hadoop.example.GroupWriteSupport;
-import parquet.hadoop.metadata.CompressionCodecName;
-import parquet.schema.MessageType;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.ForkOperatorUtils;
import static org.apache.gobblin.configuration.ConfigurationKeys.LOCAL_FS_URI;
import static
org.apache.gobblin.configuration.ConfigurationKeys.WRITER_CODEC_TYPE;
import static
org.apache.gobblin.configuration.ConfigurationKeys.WRITER_FILE_SYSTEM_URI;
import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_PREFIX;
-import static parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
-import static parquet.hadoop.ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED;
-import static parquet.hadoop.ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED;
-import static parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static
org.apache.parquet.hadoop.ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED;
+import static
org.apache.parquet.hadoop.ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE;
public class ParquetDataWriterBuilder extends FsDataWriterBuilder<MessageType,
Group> {
diff --git
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java
b/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java
similarity index 95%
copy from
gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java
copy to
gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java
index a775bc2..8a2fc9e 100644
---
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java
+++
b/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java
@@ -19,12 +19,12 @@ package org.apache.gobblin.writer;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetWriter;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
-import parquet.example.data.Group;
-import parquet.hadoop.ParquetWriter;
-
/**
* An extension to {@link FsDataWriter} that writes in Parquet format in the
form of {@link Group}s.
diff --git
a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverterTest.java
b/gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverterTest.java
similarity index 98%
copy from
gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverterTest.java
copy to
gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverterTest.java
index da9a7ce..d714680 100644
---
a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverterTest.java
+++
b/gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverterTest.java
@@ -19,11 +19,8 @@ package org.apache.gobblin.converter.parquet;
import java.io.InputStreamReader;
import java.lang.reflect.Type;
-import org.apache.gobblin.configuration.SourceState;
-import org.apache.gobblin.configuration.WorkUnitState;
-import org.apache.gobblin.converter.DataConversionException;
-import org.apache.gobblin.converter.SchemaConversionException;
-import org.apache.gobblin.source.workunit.Extract;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.schema.MessageType;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -31,8 +28,11 @@ import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.reflect.TypeToken;
-import parquet.example.data.Group;
-import parquet.schema.MessageType;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.converter.DataConversionException;
+import org.apache.gobblin.converter.SchemaConversionException;
+import org.apache.gobblin.source.workunit.Extract;
import static org.testng.Assert.assertEquals;
diff --git
a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
b/gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
similarity index 93%
copy from
gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
copy to
gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
index 40c638c..46b41d7 100644
---
a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
+++
b/gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
@@ -22,8 +22,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.State;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
@@ -32,13 +30,16 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import parquet.example.data.Group;
-import parquet.example.data.simple.convert.GroupRecordConverter;
-import parquet.hadoop.ParquetReader;
-import parquet.hadoop.api.InitContext;
-import parquet.hadoop.api.ReadSupport;
-import parquet.io.api.RecordMaterializer;
-import parquet.schema.MessageType;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.MessageType;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
import static
org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_DICTIONARY;
import static
org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_DICTIONARY_PAGE_SIZE;
@@ -91,6 +92,7 @@ public class ParquetHdfsDataWriterTest {
properties.setProp(WRITER_PARQUET_DICTIONARY_PAGE_SIZE, 1024);
properties.setProp(WRITER_PARQUET_PAGE_SIZE, 1024);
properties.setProp(WRITER_PARQUET_VALIDATE, true);
+ properties.setProp(ConfigurationKeys.WRITER_CODEC_TYPE, "gzip");
return properties;
}
diff --git
a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java
b/gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/writer/TestConstants.java
similarity index 86%
copy from
gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java
copy to
gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/writer/TestConstants.java
index 6144aaf..41d7ee3 100644
---
a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java
+++
b/gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/writer/TestConstants.java
@@ -16,12 +16,12 @@
*/
package org.apache.gobblin.writer;
-import parquet.example.data.Group;
-import parquet.example.data.simple.SimpleGroup;
-import parquet.schema.MessageType;
-import parquet.schema.OriginalType;
-import parquet.schema.PrimitiveType;
-import parquet.schema.Types;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroup;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
public class TestConstants {
@@ -37,7 +37,7 @@ public class TestConstants {
public static final String TEST_FS_URI = "file:///";
- public static final String TEST_ROOT_DIR =
System.getProperty("java.io.tmpdir");
+ public static final String TEST_ROOT_DIR =
System.getProperty("java.io.tmpdir") + "/" + System.currentTimeMillis();
public static final String TEST_STAGING_DIR = TEST_ROOT_DIR + "/staging";
diff --git
a/gobblin-modules/gobblin-parquet-apache/src/test/resources/converter/JsonIntermediateToParquetConverter.json
b/gobblin-modules/gobblin-parquet-apache/src/test/resources/converter/JsonIntermediateToParquetConverter.json
new file mode 100644
index 0000000..bbd7344
--- /dev/null
+++
b/gobblin-modules/gobblin-parquet-apache/src/test/resources/converter/JsonIntermediateToParquetConverter.json
@@ -0,0 +1,259 @@
+{
+ "simplePrimitiveTypes": {
+ "record": {
+ "a": 5,
+ "b": 5.0,
+ "c": 8.0,
+ "d": true,
+ "e": "somestring",
+ "f": "2018-01-01",
+ "g": 1545083047
+ },
+ "schema": [
+ {
+ "columnName": "a",
+ "dataType": {
+ "type": "int"
+ }
+ },
+ {
+ "columnName": "b",
+ "dataType": {
+ "type": "float"
+ }
+ },
+ {
+ "columnName": "c",
+ "dataType": {
+ "type": "double"
+ }
+ },
+ {
+ "columnName": "d",
+ "dataType": {
+ "type": "boolean"
+ }
+ },
+ {
+ "columnName": "e",
+ "dataType": {
+ "type": "string"
+ }
+ },
+ {
+ "columnName": "f",
+ "dataType": {
+ "type": "date"
+ }
+ },
+ {
+ "columnName": "g",
+ "dataType": {
+ "type": "timestamp"
+ }
+ }
+ ],
+ "expectedRecord": "a: 5 ; b: 5.0 ; c: 8.0 ; d: true ; e: somestring ; f:
2018-01-01 ; g: 1545083047 ;",
+ "expectedSchema": "message test_table{ ; required int32 a ; ; required
float b ; ; required double c ; ; required boolean d ; ; required binary e
(UTF8) ; ; required binary f (UTF8) ; ; required binary g (UTF8) ; ; } ; "
+ },
+ "array": {
+ "record": {
+ "somearray": [
+ 1,
+ 2,
+ 3
+ ],
+ "somearray1": [
+ 1,
+ 2,
+ 3
+ ],
+ "somearray2": [
+ 1.0,
+ 2.0,
+ 3.0
+ ],
+ "somearray3": [
+ 1.0,
+ 2.0,
+ 3.0
+ ],
+ "somearray4": [
+ true,
+ false,
+ true
+ ],
+ "somearray5": [
+ "hello",
+ "world"
+ ]
+ },
+ "schema": [
+ {
+ "columnName": "somearray",
+ "dataType": {
+ "type": "array",
+ "items": "int"
+ },
+ "isNullable": true
+ },
+ {
+ "columnName": "somearray1",
+ "dataType": {
+ "type": "array",
+ "items": "long"
+ }
+ },
+ {
+ "columnName": "somearray2",
+ "dataType": {
+ "type": "array",
+ "items": "float"
+ }
+ },
+ {
+ "columnName": "somearray3",
+ "dataType": {
+ "type": "array",
+ "items": "double"
+ }
+ },
+ {
+ "columnName": "somearray4",
+ "dataType": {
+ "type": "array",
+ "items": "boolean"
+ }
+ },
+ {
+ "columnName": "somearray5",
+ "dataType": {
+ "type": "array",
+ "items": "string"
+ }
+ }
+ ],
+ "expectedRecord": "somearray ; item:1 ; item:2 ; item:3 ; somearray1 ;
item:1 ; item:2 ; item:3 ; somearray2 ; item:1.0 ; item:2.0 ; item:3.0 ;
somearray3 ; item:1.0 ; item:2.0 ; item:3.0 ; somearray4 ; item:true ;
item:false ; item:true ; somearray5 ; item:hello ; item:world ; ",
+ "expectedSchema": "message test_table { ; optional group somearray { ;
repeated int32 item ; ; } ; required groupsomearray1 { ; repeated int64 item
; ; } ; required groupsomearray2 { ; repeated float item ; ; } ; required
groupsomearray3 { ; repeated double item ; ; } ; required groupsomearray4 {
; repeated boolean item ; ; } ; required groupsomearray5 { ; repeated binary
item(UTF8) ; ; } ; } ; "
+ },
+ "enum": {
+ "record": {
+ "some_enum": "HELLO"
+ },
+ "schema": [
+ {
+ "columnName": "some_enum",
+ "dataType": {
+ "type": "enum",
+ "symbols": [
+ "HELLO",
+ "WORLD"
+ ]
+ },
+ "isNullable": true
+ }
+ ],
+ "expectedRecord": "some_enum : HELLO ;",
+ "expectedSchema": "message test_table { ; optional binary some_enum (UTF8)
;; } ;"
+ },
+ "enum1": {
+ "record": {
+ "some_enum": "HELLO"
+ },
+ "schema": [
+ {
+ "columnName": "some_enum",
+ "dataType": {
+ "type": "enum",
+ "symbols": [
+ "HELLO",
+ "WORLD"
+ ]
+ },
+ "isNullable": false
+ }
+ ],
+ "expectedRecord": "some_enum : HELLO ;",
+ "expectedSchema": "message test_table { ; required binary some_enum (UTF8)
;; } ;"
+ },
+ "record": {
+ "record": {
+ "some_record": {
+ "name": "me",
+ "age": 22,
+ "some_array": [
+ 3,
+ 4,
+ 5
+ ]
+ }
+ },
+ "schema": [
+ {
+ "columnName": "some_record",
+ "dataType": {
+ "type": "record",
+ "values": [
+ {
+ "columnName": "name",
+ "dataType": {
+ "type": "string"
+ }
+ },
+ {
+ "columnName": "age",
+ "dataType": {
+ "type": "long"
+ }
+ },
+ {
+ "columnName": "some_array",
+ "dataType": {
+ "type": "array",
+ "items": "int"
+ }
+ }
+ ]
+ }
+ }
+ ],
+ "expectedRecord": "some_record ; name:me ; age:22 ; some_array ; item:3 ;
item:4 ; item:5 ;",
+ "expectedSchema": "message test_table { ; required group some_record { ;
required binary name (UTF8) ; ; required int64 age ; ; required group
some_array { ; repeated int32 item ; ; } ; } ; } ; "
+ },
+ "map": {
+ "schema": [
+ {
+ "columnName": "cityToCountry",
+ "dataType": {
+ "type": "map",
+ "values": "string"
+ }
+ }
+ ],
+ "record": {
+ "cityToCountry": {
+ "ny": "US",
+ "london": "UK",
+ "delhi": "India"
+ }
+ },
+ "expectedRecord": "cityToCountry; map; key:ny;value:US; map;
key:london;value:UK; map; key:delhi;value:India;",
+ "expectedSchema": "message test_table { ; required groupcityToCountry {
; repeated group map { ; required binary key (UTF8) ; ; required
binary value (UTF8) ; ; } ; } ; } ;"
+ },
+ "nullValueInOptionalField": {
+ "record": {
+ "a": null
+ },
+ "schema": [
+ {
+ "columnName": "a",
+ "isNullable": true,
+ "dataType": {
+ "type": "int"
+ }
+ }
+ ],
+ "expectedRecord": "",
+ "expectedSchema": "message test_table {; optional int32 a ;; };"
+ }
+}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-parquet/build.gradle
b/gobblin-modules/gobblin-parquet-common/build.gradle
similarity index 97%
copy from gobblin-modules/gobblin-parquet/build.gradle
copy to gobblin-modules/gobblin-parquet-common/build.gradle
index 75530b9..055eef2 100644
--- a/gobblin-modules/gobblin-parquet/build.gradle
+++ b/gobblin-modules/gobblin-parquet-common/build.gradle
@@ -21,7 +21,6 @@ dependencies {
compile project(":gobblin-core")
compile externalDependency.gson
- compile externalDependency.parquet
testCompile externalDependency.testng
testCompile externalDependency.mockito
diff --git
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonSchema.java
b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/converter/parquet/JsonSchema.java
similarity index 94%
rename from
gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonSchema.java
rename to
gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/converter/parquet/JsonSchema.java
index 626551d..55c3a52 100644
---
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonSchema.java
+++
b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/converter/parquet/JsonSchema.java
@@ -16,19 +16,15 @@
*/
package org.apache.gobblin.converter.parquet;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.source.extractor.schema.Schema;
-
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
-import parquet.schema.Type.Repetition;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.source.extractor.schema.Schema;
import static org.apache.gobblin.converter.parquet.JsonSchema.InputType.ENUM;
import static org.apache.gobblin.converter.parquet.JsonSchema.InputType.RECORD;
-import static parquet.schema.Type.Repetition.OPTIONAL;
-import static parquet.schema.Type.Repetition.REQUIRED;
/**
@@ -133,12 +129,10 @@ public class JsonSchema extends Schema {
}
/**
- * Parquet {@link Repetition} for this {@link JsonSchema}.
+ * Parquet {@link RepetitionType} for this {@link JsonSchema}.
* @return
*/
- public Repetition optionalOrRequired() {
- return this.isNullable() ? OPTIONAL : REQUIRED;
- }
+ //public abstract RepetitionType optionalOrRequired();
/**
* Set properties for {@link JsonSchema} from a {@link JsonObject}.
diff --git a/gobblin-modules/gobblin-parquet/build.gradle
b/gobblin-modules/gobblin-parquet/build.gradle
index 75530b9..fb56bb1 100644
--- a/gobblin-modules/gobblin-parquet/build.gradle
+++ b/gobblin-modules/gobblin-parquet/build.gradle
@@ -19,9 +19,10 @@ apply plugin: 'java'
dependencies {
compile project(":gobblin-core")
+ compile project(":gobblin-modules:gobblin-parquet-common")
compile externalDependency.gson
- compile externalDependency.parquet
+ compile externalDependency.twitterParquet
testCompile externalDependency.testng
testCompile externalDependency.mockito
diff --git
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java
b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java
index 288a5de..44cb31e 100644
---
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java
+++
b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java
@@ -22,8 +22,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import org.apache.gobblin.converter.parquet.JsonSchema.*;
-
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
@@ -43,15 +41,18 @@ import parquet.schema.PrimitiveType.PrimitiveTypeName;
import parquet.schema.Type;
import parquet.schema.Types;
-import static
org.apache.gobblin.converter.parquet.JsonElementConversionFactory.RecordConverter.RecordType.CHILD;
+import org.apache.gobblin.converter.parquet.JsonSchema.*;
+
import static org.apache.gobblin.converter.parquet.JsonSchema.*;
import static org.apache.gobblin.converter.parquet.JsonSchema.InputType.STRING;
+import static
org.apache.gobblin.converter.parquet.JsonElementConversionFactory.RecordConverter.RecordType.CHILD;
import static parquet.schema.OriginalType.UTF8;
import static parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
import static parquet.schema.Type.Repetition.OPTIONAL;
import static parquet.schema.Type.Repetition.REPEATED;
+import static parquet.schema.Type.Repetition.REQUIRED;
/**
@@ -177,7 +178,7 @@ public class JsonElementConversionFactory {
}
protected Type buildSchema() {
- return new PrimitiveType(this.repeated ? REPEATED :
this.jsonSchema.optionalOrRequired(), this.outputType,
+ return new PrimitiveType(this.repeated ? REPEATED :
optionalOrRequired(this.jsonSchema), this.outputType,
this.jsonSchema.getColumnName());
}
@@ -294,7 +295,7 @@ public class JsonElementConversionFactory {
if (this.repeated) {
return Types.repeated(BINARY).as(UTF8).named(columnName);
}
- switch (this.jsonSchema.optionalOrRequired()) {
+ switch (optionalOrRequired(this.jsonSchema)) {
case OPTIONAL:
return Types.optional(BINARY).as(UTF8).named(columnName);
case REQUIRED:
@@ -305,6 +306,10 @@ public class JsonElementConversionFactory {
}
}
+ public static Type.Repetition optionalOrRequired(JsonSchema jsonBaseSchema) {
+ return jsonBaseSchema.isNullable() ? OPTIONAL : REQUIRED;
+ }
+
public static class ArrayConverter extends CollectionConverter {
public ArrayConverter(JsonSchema arraySchema) {
@@ -325,7 +330,7 @@ public class JsonElementConversionFactory {
protected Type buildSchema() {
List<Type> fields = new ArrayList<>();
fields.add(0, this.elementConverter.schema());
- return new GroupType(this.jsonSchema.optionalOrRequired(),
this.jsonSchema.getColumnName(), fields);
+ return new GroupType(optionalOrRequired(jsonSchema),
this.jsonSchema.getColumnName(), fields);
}
@Override
@@ -396,7 +401,7 @@ public class JsonElementConversionFactory {
JsonElementConverter converter = this.converters.get(key);
Object convertedValue = converter.convert(entry.getValue());
boolean valueIsNull = convertedValue == null;
- Type.Repetition repetition = converter.jsonSchema.optionalOrRequired();
+ Type.Repetition repetition = optionalOrRequired(converter.jsonSchema);
if (valueIsNull && repetition.equals(OPTIONAL)) {
continue;
}
@@ -422,7 +427,7 @@ public class JsonElementConversionFactory {
case ROOT:
return new MessageType(docName, parquetTypes);
case CHILD:
- return new GroupType(this.jsonSchema.optionalOrRequired(), docName,
parquetTypes);
+ return new GroupType(optionalOrRequired(this.jsonSchema), docName,
parquetTypes);
default:
throw new RuntimeException("Unsupported Record type");
}
@@ -463,7 +468,7 @@ public class JsonElementConversionFactory {
Types.repeatedGroup().addFields(keyConverter.schema(),
elementConverter.schema()).named(MAP_KEY)
.asGroupType();
String columnName = this.jsonSchema.getColumnName();
- switch (this.jsonSchema.optionalOrRequired()) {
+ switch (optionalOrRequired(this.jsonSchema)) {
case OPTIONAL:
return
Types.optionalGroup().addFields(mapGroup).named(columnName).asGroupType();
case REQUIRED:
diff --git
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverter.java
b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverter.java
index b04dcf8..328d86d 100644
---
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverter.java
+++
b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverter.java
@@ -16,6 +16,12 @@
*/
package org.apache.gobblin.converter.parquet;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+
+import parquet.example.data.Group;
+import parquet.schema.MessageType;
+
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.converter.Converter;
import org.apache.gobblin.converter.DataConversionException;
@@ -23,12 +29,6 @@ import
org.apache.gobblin.converter.SchemaConversionException;
import org.apache.gobblin.converter.SingleRecordIterable;
import
org.apache.gobblin.converter.parquet.JsonElementConversionFactory.RecordConverter;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonObject;
-
-import parquet.example.data.Group;
-import parquet.schema.MessageType;
-
import static
org.apache.gobblin.converter.parquet.JsonElementConversionFactory.RecordConverter.RecordType.ROOT;
diff --git
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/ParquetGroup.java
b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/ParquetGroup.java
index 783d845..f2e0a99 100644
---
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/ParquetGroup.java
+++
b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/ParquetGroup.java
@@ -83,6 +83,7 @@ public class ParquetGroup extends Group {
return result.toString();
}
+ @Override
public Group addGroup(int fieldIndex) {
ParquetGroup g = new
ParquetGroup(this.schema.getType(fieldIndex).asGroupType());
this.data[fieldIndex].add(g);
diff --git
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
index 7ce2020..4a47792 100644
---
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
+++
b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
@@ -19,8 +19,6 @@ package org.apache.gobblin.writer;
import java.io.IOException;
import java.util.Optional;
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.util.ForkOperatorUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -34,6 +32,9 @@ import parquet.hadoop.example.GroupWriteSupport;
import parquet.hadoop.metadata.CompressionCodecName;
import parquet.schema.MessageType;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.ForkOperatorUtils;
+
import static org.apache.gobblin.configuration.ConfigurationKeys.LOCAL_FS_URI;
import static
org.apache.gobblin.configuration.ConfigurationKeys.WRITER_CODEC_TYPE;
import static
org.apache.gobblin.configuration.ConfigurationKeys.WRITER_FILE_SYSTEM_URI;
diff --git
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java
b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java
index a775bc2..744c784 100644
---
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java
+++
b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java
@@ -19,12 +19,12 @@ package org.apache.gobblin.writer;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.State;
-
import parquet.example.data.Group;
import parquet.hadoop.ParquetWriter;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+
/**
* An extension to {@link FsDataWriter} that writes in Parquet format in the
form of {@link Group}s.
diff --git
a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverterTest.java
b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverterTest.java
index da9a7ce..fd828fa 100644
---
a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverterTest.java
+++
b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverterTest.java
@@ -19,11 +19,6 @@ package org.apache.gobblin.converter.parquet;
import java.io.InputStreamReader;
import java.lang.reflect.Type;
-import org.apache.gobblin.configuration.SourceState;
-import org.apache.gobblin.configuration.WorkUnitState;
-import org.apache.gobblin.converter.DataConversionException;
-import org.apache.gobblin.converter.SchemaConversionException;
-import org.apache.gobblin.source.workunit.Extract;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -34,6 +29,12 @@ import com.google.gson.reflect.TypeToken;
import parquet.example.data.Group;
import parquet.schema.MessageType;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.converter.DataConversionException;
+import org.apache.gobblin.converter.SchemaConversionException;
+import org.apache.gobblin.source.workunit.Extract;
+
import static org.testng.Assert.assertEquals;
diff --git
a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
index 40c638c..086b084 100644
---
a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
+++
b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
@@ -22,8 +22,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.State;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
@@ -40,6 +38,9 @@ import parquet.hadoop.api.ReadSupport;
import parquet.io.api.RecordMaterializer;
import parquet.schema.MessageType;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+
import static
org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_DICTIONARY;
import static
org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_DICTIONARY_PAGE_SIZE;
import static
org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_PAGE_SIZE;
@@ -91,6 +92,7 @@ public class ParquetHdfsDataWriterTest {
properties.setProp(WRITER_PARQUET_DICTIONARY_PAGE_SIZE, 1024);
properties.setProp(WRITER_PARQUET_PAGE_SIZE, 1024);
properties.setProp(WRITER_PARQUET_VALIDATE, true);
+ properties.setProp(ConfigurationKeys.WRITER_CODEC_TYPE, "gzip");
return properties;
}
diff --git
a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java
b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java
index 6144aaf..e5bf215 100644
---
a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java
+++
b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java
@@ -37,7 +37,7 @@ public class TestConstants {
public static final String TEST_FS_URI = "file:///";
- public static final String TEST_ROOT_DIR =
System.getProperty("java.io.tmpdir");
+ public static final String TEST_ROOT_DIR =
System.getProperty("java.io.tmpdir") + "/" + System.currentTimeMillis();
public static final String TEST_STAGING_DIR = TEST_ROOT_DIR + "/staging";
diff --git a/gobblin-test-harness/build.gradle
b/gobblin-test-harness/build.gradle
index e231ced..48172d2 100644
--- a/gobblin-test-harness/build.gradle
+++ b/gobblin-test-harness/build.gradle
@@ -28,6 +28,7 @@ dependencies {
testCompile externalDependency.calciteAvatica
testCompile externalDependency.jhyde
testCompile externalDependency.testng
+ testCompile externalDependency.twitterParquet
}
configurations { compile { transitive = false } }
diff --git
a/gobblin-test-harness/src/test/java/org/apache/gobblin/GobblinLocalJobLauncherUtils.java
b/gobblin-test-harness/src/test/java/org/apache/gobblin/GobblinLocalJobLauncherUtils.java
index 8bbabb7..d306ed8 100644
---
a/gobblin-test-harness/src/test/java/org/apache/gobblin/GobblinLocalJobLauncherUtils.java
+++
b/gobblin-test-harness/src/test/java/org/apache/gobblin/GobblinLocalJobLauncherUtils.java
@@ -23,13 +23,15 @@ import java.util.UUID;
import org.apache.commons.io.FileUtils;
+import com.google.common.io.Files;
+
import org.apache.gobblin.runtime.app.ApplicationLauncher;
import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
import org.apache.gobblin.runtime.local.LocalJobLauncher;
public class GobblinLocalJobLauncherUtils {
- public static final String RESOURCE_DIR =
"./gobblin-test-harness/src/test/resources/runtime_test/";
+ public static final String RESOURCE_DIR =
Files.createTempDir().getAbsolutePath()+"/";
public static final String SAMPLE_DIR = "test_data/daily/2016/10/01/";
public static final String DATA_PURGER_COMMIT_DATA =
"data.purger.commit.data";
public static final String STATE_STORE = "state_store";
diff --git
a/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskSkipErrRecordsIntegrationTest.java
b/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskSkipErrRecordsIntegrationTest.java
index 32b1836..a2044b6 100644
---
a/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskSkipErrRecordsIntegrationTest.java
+++
b/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskSkipErrRecordsIntegrationTest.java
@@ -18,15 +18,18 @@ package org.apache.gobblin;
import java.io.File;
import java.io.IOException;
+import java.net.URL;
import java.util.Properties;
import org.apache.commons.io.FileUtils;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.JobException;
+import org.testng.Assert;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.JobException;
+
@Test
public class TaskSkipErrRecordsIntegrationTest {
@@ -70,7 +73,10 @@ public class TaskSkipErrRecordsIntegrationTest {
throws IOException {
Properties jobProperties =
GobblinLocalJobLauncherUtils.getJobProperties("runtime_test/task_skip_err_records.properties");
- FileUtils.copyFile(new File(GobblinLocalJobLauncherUtils.RESOURCE_DIR +
SAMPLE_FILE),
+ URL resource = getClass().getClassLoader().getResource("runtime_test/" +
SAMPLE_FILE);
+ Assert.assertNotNull(resource, "Sample file not found");
+ File sampleFile = new File(resource.getFile());
+ FileUtils.copyFile(sampleFile,
new File(GobblinLocalJobLauncherUtils.RESOURCE_DIR +
GobblinLocalJobLauncherUtils.SAMPLE_DIR + SAMPLE_FILE));
jobProperties.setProperty(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL,
GobblinLocalJobLauncherUtils.RESOURCE_DIR +
GobblinLocalJobLauncherUtils.SAMPLE_DIR + SAMPLE_FILE);
diff --git
a/gobblin-test-harness/src/test/java/org/apache/gobblin/WriterOutputFormatIntegrationTest.java
b/gobblin-test-harness/src/test/java/org/apache/gobblin/WriterOutputFormatIntegrationTest.java
index 54a78aa..bfb8a5d 100644
---
a/gobblin-test-harness/src/test/java/org/apache/gobblin/WriterOutputFormatIntegrationTest.java
+++
b/gobblin-test-harness/src/test/java/org/apache/gobblin/WriterOutputFormatIntegrationTest.java
@@ -18,13 +18,17 @@ package org.apache.gobblin;
import java.io.File;
import java.io.IOException;
+import java.net.URL;
import java.util.Properties;
import org.apache.commons.io.FileUtils;
+import org.testng.Assert;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
+import com.google.common.io.Files;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.hive.HiveSerDeWrapper;
@@ -70,10 +74,19 @@ public class WriterOutputFormatIntegrationTest {
throws IOException {
Properties jobProperties =
GobblinLocalJobLauncherUtils.getJobProperties("runtime_test/writer_output_format_test.properties");
- FileUtils.copyFile(new File(GobblinLocalJobLauncherUtils.RESOURCE_DIR +
SAMPLE_FILE),
- new File(GobblinLocalJobLauncherUtils.RESOURCE_DIR +
GobblinLocalJobLauncherUtils.SAMPLE_DIR + SAMPLE_FILE));
+ URL resource = getClass().getClassLoader().getResource("runtime_test/" +
SAMPLE_FILE);
+ Assert.assertNotNull(resource, "Sample file should be present");
+ File sampleFile = new File(resource.getFile());
+ File testFile = File.createTempFile("writerTest", ".avro");
+ FileUtils.copyFile(sampleFile, testFile);
jobProperties.setProperty(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL,
- GobblinLocalJobLauncherUtils.RESOURCE_DIR +
GobblinLocalJobLauncherUtils.SAMPLE_DIR + SAMPLE_FILE);
+ testFile.getAbsolutePath());
+
+ String outputRootDirectory = Files.createTempDir().getAbsolutePath() + "/";
+ jobProperties.setProperty(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY,
outputRootDirectory + "state_store");
+ jobProperties.setProperty(ConfigurationKeys.WRITER_STAGING_DIR,
outputRootDirectory + "writer_staging");
+ jobProperties.setProperty(ConfigurationKeys.WRITER_OUTPUT_DIR,
outputRootDirectory + "writer_output");
+ jobProperties.setProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR,
outputRootDirectory + "final_dir");
return jobProperties;
}
}
diff --git
a/gobblin-test-harness/src/test/resources/runtime_test/writer_output_format_test.properties
b/gobblin-test-harness/src/test/resources/runtime_test/writer_output_format_test.properties
index 1c0ac20..9dca5e3 100644
---
a/gobblin-test-harness/src/test/resources/runtime_test/writer_output_format_test.properties
+++
b/gobblin-test-harness/src/test/resources/runtime_test/writer_output_format_test.properties
@@ -28,15 +28,10 @@ writer.destination.type=HDFS
writer.eager.initialization=true
writer.file.path=output
-state.store.dir=./gobblin-test-harness/src/test/resources/runtime_test/state_store
-writer.staging.dir=./gobblin-test-harness/src/test/resources/runtime_test/writer_staging
-writer.output.dir=./gobblin-test-harness/src/test/resources/runtime_test/writer_output
-data.publisher.final.dir=./gobblin-test-harness/src/test/resources/runtime_test/final_dir
-
converter.classes=org.apache.gobblin.converter.avro.AvroRecordToAvroWritableConverter,org.apache.gobblin.converter.serde.HiveSerDeConverter
serde.deserializer.type=AVRO
writer.builder.class=org.apache.gobblin.writer.HiveWritableHdfsDataWriterBuilder
fork.record.queue.capacity=1
avro.schema.literal={"type":"record","name":"User","namespace":"example.avro","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}
-source.class=org.apache.gobblin.TestAvroSource
\ No newline at end of file
+source.class=org.apache.gobblin.TestAvroSource
diff --git a/gradle/scripts/dependencyDefinitions.gradle
b/gradle/scripts/dependencyDefinitions.gradle
index 8ef4b3a..f2836f7 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -168,7 +168,8 @@ ext.externalDependency = [
"grok": "io.thekraken:grok:0.1.5",
"hadoopAdl" : "org.apache.hadoop:hadoop-azure-datalake:3.0.0-alpha2",
"orcMapreduce":"org.apache.orc:orc-mapreduce:1.6.1",
- 'parquet': 'com.twitter:parquet-hadoop-bundle:1.5.0',
+ 'parquet': 'org.apache.parquet:parquet-hadoop:1.10.1',
+ 'twitterParquet': 'com.twitter:parquet-hadoop-bundle:1.5.0',
'reactivex': 'io.reactivex.rxjava2:rxjava:2.1.0',
"slf4j": [
"org.slf4j:slf4j-api:" + slf4jVersion,