This is an automated email from the ASF dual-hosted git repository. shirshanka pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push: new f44ad4f [GOBBLIN-1015] Adding support for direct Avro and Protobuf writes in Parquet format f44ad4f is described below commit f44ad4f2b19f9fcd00518a748748bd5b72c34df3 Author: Shirshanka Das <s...@linkedin.com> AuthorDate: Wed Jan 15 14:25:12 2020 -0800 [GOBBLIN-1015] Adding support for direct Avro and Protobuf writes in Parquet format Closes #2860 from shirshanka/protoavroparquet --- defaultEnvironment.gradle | 3 + gobblin-core-base/build.gradle | 4 +- .../apache/gobblin/test/AnyToJsonConverter.java | 4 +- .../apache/gobblin/test/AnyToStringConverter.java | 4 +- .../apache/gobblin/test/SequentialTestSource.java | 94 ++++++++-- gobblin-docs/sinks/ParquetHdfsDataWriter.md | 6 +- .../src/main/resources/example-parquet.pull | 34 ++++ .../gobblin-parquet-apache/build.gradle | 2 + .../gobblin/writer/ParquetDataWriterBuilder.java | 144 +++++++-------- .../gobblin/writer/ParquetHdfsDataWriter.java | 70 -------- .../gobblin/writer/ParquetHdfsDataWriterTest.java | 194 ++++++++++++--------- .../org/apache/gobblin/writer/TestConstants.java | 51 ++---- .../writer/AbstractParquetDataWriterBuilder.java | 73 ++++++++ .../parquet}/writer/ParquetHdfsDataWriter.java | 21 +-- .../parquet/writer/ParquetRecordFormat.java | 22 +-- .../parquet/writer/ParquetWriterConfiguration.java | 107 ++++++++++++ .../gobblin/parquet/writer/ParquetWriterShim.java | 20 +-- .../writer/test/ParquetHdfsDataWriterTestBase.java | 145 +++++++++++++++ .../parquet/writer/test/TestConstantsBase.java | 129 ++++++++++++++ gobblin-modules/gobblin-parquet/build.gradle | 2 + .../gobblin/writer/ParquetDataWriterBuilder.java | 148 ++++++++-------- .../gobblin/writer/ParquetHdfsDataWriterTest.java | 188 +++++++++++--------- .../org/apache/gobblin/writer/TestConstants.java | 54 +++--- gobblin-test-utils/build.gradle | 63 +++++++ .../buildConfig/findbugs-exclude-filter.xml | 6 + gobblin-test-utils/src/main/avro/TestRecord.avsc | 23 +++ .../java/org/apache/gobblin/test/TestRecord.java | 0 gobblin-test-utils/src/main/proto/TestRecord.proto | 11 ++ gradle/scripts/dependencyDefinitions.gradle | 7 +- 29 files changed, 1121 insertions(+), 508 deletions(-) diff --git a/defaultEnvironment.gradle b/defaultEnvironment.gradle index af64d4e..b5f10a7 100644 --- a/defaultEnvironment.gradle +++ b/defaultEnvironment.gradle @@ -28,6 +28,9 @@ subprojects { maven { url "http://conjars.org/repo" } + maven { + url "https://maven.twttr.com/" + } } project.buildDir = new File(project.rootProject.buildDir, project.name) diff --git a/gobblin-core-base/build.gradle b/gobblin-core-base/build.gradle index 934a22e..80b0f67 100644 --- a/gobblin-core-base/build.gradle +++ b/gobblin-core-base/build.gradle @@ -18,11 +18,13 @@ apply plugin: 'java' apply plugin: 'me.champeau.gradle.jmh' + dependencies { compile project(":gobblin-api") compile project(":gobblin-utility") compile project(":gobblin-metrics-libs:gobblin-metrics") compile project(":gobblin-modules:gobblin-codecs") + compile project(":gobblin-test-utils") compile externalDependency.reactivex compile externalDependency.avroMapredH2 @@ -34,7 +36,6 @@ dependencies { compile externalDependency.typesafeConfig compile externalDependency.findBugsAnnotations - testCompile project(":gobblin-test-utils") testCompile externalDependency.testng testCompile externalDependency.mockito @@ -51,5 +52,4 @@ jmh { duplicateClassesStrategy = "EXCLUDE" } - ext.classification="library" diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/test/AnyToJsonConverter.java b/gobblin-core-base/src/main/java/org/apache/gobblin/test/AnyToJsonConverter.java index c6acc06..634098b 100644 --- a/gobblin-core-base/src/main/java/org/apache/gobblin/test/AnyToJsonConverter.java +++ b/gobblin-core-base/src/main/java/org/apache/gobblin/test/AnyToJsonConverter.java @@ -31,12 +31,12 @@ import org.apache.gobblin.util.io.GsonInterfaceAdapter; /** * Converts any Object into a Json object */ -public class AnyToJsonConverter extends Converter<String, String, Object, JsonElement> { +public class AnyToJsonConverter extends Converter<Object, String, Object, JsonElement> { private static final Gson GSON = GsonInterfaceAdapter.getGson(Object.class); private boolean stripTopLevelType = true; // TODO: Configure @Override - public String convertSchema(String inputSchema, WorkUnitState workUnit) + public String convertSchema(Object inputSchema, WorkUnitState workUnit) throws SchemaConversionException { return ""; } diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/test/AnyToStringConverter.java b/gobblin-core-base/src/main/java/org/apache/gobblin/test/AnyToStringConverter.java index adc801e..3684f2d 100644 --- a/gobblin-core-base/src/main/java/org/apache/gobblin/test/AnyToStringConverter.java +++ b/gobblin-core-base/src/main/java/org/apache/gobblin/test/AnyToStringConverter.java @@ -26,10 +26,10 @@ import org.apache.gobblin.converter.SingleRecordIterable; /** * Converts any Object into a String */ -public class AnyToStringConverter extends Converter<String, String, Object, String> { +public class AnyToStringConverter extends Converter<Object, String, Object, String> { @Override - public String convertSchema(String inputSchema, WorkUnitState workUnit) + public String convertSchema(Object inputSchema, WorkUnitState workUnit) throws SchemaConversionException { return ""; } diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/test/SequentialTestSource.java b/gobblin-core-base/src/main/java/org/apache/gobblin/test/SequentialTestSource.java index 12ba1f2..5872808 100644 --- a/gobblin-core-base/src/main/java/org/apache/gobblin/test/SequentialTestSource.java +++ b/gobblin-core-base/src/main/java/org/apache/gobblin/test/SequentialTestSource.java @@ -39,28 +39,38 @@ import org.apache.gobblin.source.extractor.CheckpointableWatermark; import org.apache.gobblin.source.extractor.DataRecordException; import org.apache.gobblin.source.extractor.DefaultCheckpointableWatermark; import org.apache.gobblin.source.extractor.Extractor; -import org.apache.gobblin.stream.RecordEnvelope; import org.apache.gobblin.source.extractor.StreamingExtractor; import org.apache.gobblin.source.extractor.WatermarkInterval; import org.apache.gobblin.source.extractor.extract.LongWatermark; import org.apache.gobblin.source.workunit.Extract; import org.apache.gobblin.source.workunit.ExtractFactory; import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.gobblin.stream.RecordEnvelope; +import org.apache.gobblin.test.proto.TestRecordProtos; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.writer.WatermarkStorage; - /** * A Test source that generates a sequence of records, works in batch and streaming mode. */ @Slf4j -public class SequentialTestSource implements Source<String, Object> { +public class SequentialTestSource implements Source<Object, Object> { + + private enum InMemoryFormat { + POJO, + AVRO, + PROTOBUF + } + + private static final int DEFAULT_NUM_PARALLELISM = 1; private static final String DEFAULT_NAMESPACE = "TestDB"; private static final String DEFAULT_TABLE = "TestTable"; private static final Integer DEFAULT_NUM_RECORDS_PER_EXTRACT = 100; public static final String WORK_UNIT_INDEX = "workUnitIndex"; private static final Long DEFAULT_SLEEP_TIME_PER_RECORD_MILLIS = 10L; + public static final String MEMORY_FORMAT_KEY = "inMemoryFormat"; + public static final String DEFAULT_IN_MEMORY_FORMAT = InMemoryFormat.POJO.toString(); private final AtomicBoolean configured = new AtomicBoolean(false); @@ -69,6 +79,7 @@ public class SequentialTestSource implements Source<String, Object> { private String table; private int numRecordsPerExtract; private long sleepTimePerRecord; + private InMemoryFormat inMemFormat; private final Extract.TableType tableType = Extract.TableType.APPEND_ONLY; private final ExtractFactory _extractFactory = new ExtractFactory("yyyyMMddHHmmss"); private boolean streaming = false; @@ -86,6 +97,12 @@ public class SequentialTestSource implements Source<String, Object> { if (streaming) { numRecordsPerExtract = Integer.MAX_VALUE; } + inMemFormat = InMemoryFormat.valueOf(ConfigUtils.getString(config, "source." + MEMORY_FORMAT_KEY, + DEFAULT_IN_MEMORY_FORMAT)); + log.info("Source configured with: num_parallelism: {}, namespace: {}, " + + "table: {}, numRecordsPerExtract: {}, sleepTimePerRecord: {}, streaming: {}, inMemFormat: {}", + this.num_parallelism, this.namespace, + this.table, this.numRecordsPerExtract, this.sleepTimePerRecord, this.streaming, this.inMemFormat); configured.set(true); } } @@ -110,6 +127,7 @@ public class SequentialTestSource implements Source<String, Object> { workUnit = WorkUnit.create(newExtract(tableType, namespace, table), watermarkInterval); log.debug("Will be setting watermark interval to " + watermarkInterval.toJson()); workUnit.setProp(WORK_UNIT_INDEX, workUnitState.getWorkunit().getProp(WORK_UNIT_INDEX)); + workUnit.setProp(MEMORY_FORMAT_KEY, this.inMemFormat.toString()); } else { @@ -120,6 +138,7 @@ public class SequentialTestSource implements Source<String, Object> { workUnit = WorkUnit.create(newExtract(tableType, namespace, table), watermarkInterval); log.debug("Will be setting watermark interval to " + watermarkInterval.toJson()); workUnit.setProp(WORK_UNIT_INDEX, workUnitState.getWorkunit().getProp(WORK_UNIT_INDEX)); + workUnit.setProp(MEMORY_FORMAT_KEY, this.inMemFormat.toString()); } newWorkUnits.add(workUnit); } @@ -140,6 +159,7 @@ public class SequentialTestSource implements Source<String, Object> { LongWatermark expectedHighWatermark = new LongWatermark((i + 1) * numRecordsPerExtract); workUnit.setWatermarkInterval(new WatermarkInterval(lowWatermark, expectedHighWatermark)); workUnit.setProp(WORK_UNIT_INDEX, i); + workUnit.setProp(MEMORY_FORMAT_KEY, this.inMemFormat.toString()); workUnits.add(workUnit); } return workUnits; @@ -150,12 +170,14 @@ public class SequentialTestSource implements Source<String, Object> { } - static class TestBatchExtractor implements Extractor<String, Object> { + static class TestBatchExtractor implements Extractor<Object, Object> { private long recordsExtracted = 0; private final long numRecordsPerExtract; private LongWatermark currentWatermark; private long sleepTimePerRecord; private int partition; + private final InMemoryFormat inMemoryFormat; + private final Object schema; WorkUnitState workUnitState; @@ -169,16 +191,34 @@ public class SequentialTestSource implements Source<String, Object> { this.numRecordsPerExtract = numRecordsPerExtract; this.sleepTimePerRecord = sleepTimePerRecord; this.workUnitState = wuState; + this.inMemoryFormat = InMemoryFormat.valueOf(this.workUnitState.getProp(MEMORY_FORMAT_KEY)); + this.schema = getSchema(inMemoryFormat); } @Override - public String getSchema() + public Object getSchema() throws IOException { - return ""; + return this.schema; + } + + private Object getSchema(InMemoryFormat inMemoryFormat) { + switch (inMemoryFormat) { + case POJO: { + return TestRecord.class; + } + case AVRO: { + return org.apache.gobblin.test.avro.TestRecord.getClassSchema(); + } + case PROTOBUF: { + return TestRecordProtos.TestRecord.class; + } + default: + throw new RuntimeException("Not implemented " + inMemoryFormat.name()); + } } @Override - public Object readRecord(@Deprecated Object reuse) + public RecordEnvelope readRecordEnvelope() throws DataRecordException, IOException { if (recordsExtracted < numRecordsPerExtract) { try { @@ -186,11 +226,37 @@ public class SequentialTestSource implements Source<String, Object> { } catch (InterruptedException e) { Throwables.propagate(e); } - TestRecord record = new TestRecord(this.partition, this.currentWatermark.getValue(), null); + Object record; + switch (this.inMemoryFormat) { + case POJO: { + record = new TestRecord(this.partition, this.currentWatermark.getValue(), "I am a POJO message"); + break; + } + case AVRO: { + record = org.apache.gobblin.test.avro.TestRecord.newBuilder() + .setPartition(this.partition) + .setSequence(this.currentWatermark.getValue()) + .setPayload("I am an Avro message") + .build(); + break; + } + case PROTOBUF: { + record = TestRecordProtos.TestRecord.newBuilder() + .setPartition(this.partition) + .setSequence(this.currentWatermark.getValue()) + .setPayload("I am a Protobuf message") + .build(); + break; + } + default: throw new RuntimeException(""); + } log.debug("Extracted record -> {}", record); + RecordEnvelope re = new RecordEnvelope<>(record, + new DefaultCheckpointableWatermark(String.valueOf(this.partition), + new LongWatermark(this.currentWatermark.getValue()))); currentWatermark.increment(); recordsExtracted++; - return record; + return re; } else { return null; } @@ -218,7 +284,7 @@ public class SequentialTestSource implements Source<String, Object> { } - static class TestStreamingExtractor implements StreamingExtractor<String, Object> { + static class TestStreamingExtractor implements StreamingExtractor<Object, Object> { private Optional<WatermarkStorage> watermarkStorage; private final TestBatchExtractor extractor; @@ -233,7 +299,7 @@ public class SequentialTestSource implements Source<String, Object> { } @Override - public String getSchema() + public Object getSchema() throws IOException { return extractor.getSchema(); } @@ -241,9 +307,7 @@ public class SequentialTestSource implements Source<String, Object> { @Override public RecordEnvelope<Object> readRecordEnvelope() throws DataRecordException, IOException { - TestRecord record = (TestRecord) extractor.readRecord(null); - return new RecordEnvelope<>((Object) record, new DefaultCheckpointableWatermark(""+record.getPartition(), - new LongWatermark(record.getSequence()))); + return extractor.readRecordEnvelope(); } @Override @@ -288,7 +352,7 @@ public class SequentialTestSource implements Source<String, Object> { @Override - public Extractor<String, Object> getExtractor(WorkUnitState state) + public Extractor<Object, Object> getExtractor(WorkUnitState state) throws IOException { Config config = ConfigFactory.parseProperties(state.getProperties()); configureIfNeeded(config); diff --git a/gobblin-docs/sinks/ParquetHdfsDataWriter.md b/gobblin-docs/sinks/ParquetHdfsDataWriter.md index a7a24ea..8485cbf 100644 --- a/gobblin-docs/sinks/ParquetHdfsDataWriter.md +++ b/gobblin-docs/sinks/ParquetHdfsDataWriter.md @@ -1,6 +1,6 @@ # Description -An extension to [`FsDataWriter`](https://github.com/apache/incubator-gobblin/blob/master/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java) that writes in Parquet format in the form of [`Group.java`](https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/example/data/Group.java). This implementation allows users to specify the CodecFactory to use through the configuration property [`writer.codec.type`](https://gobblin.readthe [...] +An extension to [`FsDataWriter`](https://github.com/apache/incubator-gobblin/blob/master/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java) that writes in Parquet format in the form of either Avro, Protobuf or [`ParquetGroup`](https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/example/data/Group.java). This implementation allows users to specify the CodecFactory to use through the configuration property [`writer.codec.typ [...] # Usage ``` @@ -9,6 +9,9 @@ writer.destination.type=HDFS writer.output.format=PARQUET ``` +# Example Pipeline Configuration +* [`example-parquet.pull`](https://github.com/apache/incubator-gobblin/blob/master/gobblin-example/src/main/resources/example-parquet.pull) contains an example of generating test data and writing to Parquet files. + # Configuration @@ -19,6 +22,7 @@ writer.output.format=PARQUET | writer.parquet.dictionary | To turn dictionary encoding on. Parquet has a dictionary encoding for data with a small number of unique values ( < 10^5 ) that aids in significant compression and boosts processing speed. | true | No | | writer.parquet.validate | To turn on validation using the schema. This validation is done by [`ParquetWriter`](https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java) not by Gobblin. | false | No | | writer.parquet.version | Version of parquet writer to use. Available versions are v1 and v2. | v1 | No | +| writer.parquet.format | In-memory format of the record being written to Parquet. [`Options`](https://github.com/apache/incubator-gobblin/blob/master/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetRecordFormat.java) are AVRO, PROTOBUF and GROUP | GROUP | No | # Developer Notes diff --git a/gobblin-example/src/main/resources/example-parquet.pull b/gobblin-example/src/main/resources/example-parquet.pull new file mode 100644 index 0000000..e61caf6 --- /dev/null +++ b/gobblin-example/src/main/resources/example-parquet.pull @@ -0,0 +1,34 @@ +job.name=ExampleParquetWriter +job.group=Parquet +job.description=This is a job that generates test data and writes to Parquet +job.lock.enabled=false +task.execution.synchronousExecutionModel=false + +source.class=org.apache.gobblin.test.SequentialTestSource +source.numParallelism=2 +source.inMemoryFormat=AVRO +#source.inMemoryFormat = POJO +#source.inMemoryFormat = PROTOBUF + +fs.uri=file:/// +#work.dir=SET_TO_WORK_DIRECTORY + +#converter.classes = CONVERTER_CLASSES_IF_ANY + +extract.table.name=TestData +extract.namespace=org.apache.gobblin.example +extract.table.type=APPEND_ONLY + +state.store.enabled=true +state.store.fs.uri=${fs.uri} +#state.store.dir=${work.dir}/store + + +writer.destination.type=HDFS +writer.output.format=PARQUET +writer.parquet.format=AVRO +#writer.parquet.format=PROTOBUF // use this for Protobuf data +writer.fs.uri=${fs.uri} +writer.builder.class=org.apache.gobblin.writer.ParquetDataWriterBuilder +data.publisher.fs.uri=${fs.uri} +data.publisher.type=org.apache.gobblin.publisher.BaseDataPublisher diff --git a/gobblin-modules/gobblin-parquet-apache/build.gradle b/gobblin-modules/gobblin-parquet-apache/build.gradle index 560638d..5cf4618 100644 --- a/gobblin-modules/gobblin-parquet-apache/build.gradle +++ b/gobblin-modules/gobblin-parquet-apache/build.gradle @@ -23,6 +23,8 @@ dependencies { compile externalDependency.gson compile externalDependency.parquet + compile externalDependency.parquetAvro + compile externalDependency.parquetProto testCompile externalDependency.testng testCompile externalDependency.mockito diff --git a/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java b/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java index ac4e2d4..2060037 100644 --- a/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java +++ b/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java @@ -17,96 +17,102 @@ package org.apache.gobblin.writer; import java.io.IOException; -import java.util.Optional; +import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; +import org.apache.parquet.avro.AvroParquetWriter; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.example.data.Group; import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.example.GroupWriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.proto.ProtoParquetWriter; import org.apache.parquet.schema.MessageType; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; +import com.google.protobuf.Message; -import org.apache.gobblin.configuration.State; -import org.apache.gobblin.util.ForkOperatorUtils; +import lombok.extern.slf4j.Slf4j; -import static org.apache.gobblin.configuration.ConfigurationKeys.LOCAL_FS_URI; -import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_CODEC_TYPE; -import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_FILE_SYSTEM_URI; -import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_PREFIX; -import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE; -import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED; -import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED; -import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE; +import org.apache.gobblin.parquet.writer.AbstractParquetDataWriterBuilder; +import org.apache.gobblin.parquet.writer.ParquetWriterConfiguration; +import org.apache.gobblin.parquet.writer.ParquetWriterShim; -public class ParquetDataWriterBuilder extends FsDataWriterBuilder<MessageType, Group> { - public static final String WRITER_PARQUET_PAGE_SIZE = WRITER_PREFIX + ".parquet.pageSize"; - public static final String WRITER_PARQUET_DICTIONARY_PAGE_SIZE = WRITER_PREFIX + ".parquet.dictionaryPageSize"; - public static final String WRITER_PARQUET_DICTIONARY = WRITER_PREFIX + ".parquet.dictionary"; - public static final String WRITER_PARQUET_VALIDATE = WRITER_PREFIX + ".parquet.validate"; - public static final String WRITER_PARQUET_VERSION = WRITER_PREFIX + ".parquet.version"; - public static final String DEFAULT_PARQUET_WRITER = "v1"; - - @Override - public DataWriter<Group> build() - throws IOException { - Preconditions.checkNotNull(this.destination); - Preconditions.checkArgument(!Strings.isNullOrEmpty(this.writerId)); - Preconditions.checkNotNull(this.schema); - Preconditions.checkArgument(this.format == WriterOutputFormat.PARQUET); - - switch (this.destination.getType()) { - case HDFS: - return new ParquetHdfsDataWriter(this, this.destination.getProperties()); - default: - throw new RuntimeException("Unknown destination type: " + this.destination.getType()); - } - } +@Slf4j +public class ParquetDataWriterBuilder<S,D> extends AbstractParquetDataWriterBuilder<S,D> { /** - * Build a {@link ParquetWriter<Group>} for given file path with a block size. - * @param blockSize - * @param stagingFile + * Build a version-specific {@link ParquetWriter} for given {@link ParquetWriterConfiguration} + * @param writerConfiguration * @return * @throws IOException */ - public ParquetWriter<Group> getWriter(int blockSize, Path stagingFile) + @Override + public ParquetWriterShim getVersionSpecificWriter(ParquetWriterConfiguration writerConfiguration) throws IOException { - State state = this.destination.getProperties(); - int pageSize = state.getPropAsInt(getProperty(WRITER_PARQUET_PAGE_SIZE), DEFAULT_PAGE_SIZE); - int dictPageSize = state.getPropAsInt(getProperty(WRITER_PARQUET_DICTIONARY_PAGE_SIZE), DEFAULT_BLOCK_SIZE); - boolean enableDictionary = - state.getPropAsBoolean(getProperty(WRITER_PARQUET_DICTIONARY), DEFAULT_IS_DICTIONARY_ENABLED); - boolean validate = state.getPropAsBoolean(getProperty(WRITER_PARQUET_VALIDATE), DEFAULT_IS_VALIDATING_ENABLED); - String rootURI = state.getProp(WRITER_FILE_SYSTEM_URI, LOCAL_FS_URI); - Path absoluteStagingFile = new Path(rootURI, stagingFile); - CompressionCodecName codec = getCodecFromConfig(); - GroupWriteSupport support = new GroupWriteSupport(); - Configuration conf = new Configuration(); - GroupWriteSupport.setSchema(this.schema, conf); - ParquetProperties.WriterVersion writerVersion = getWriterVersion(); - return new ParquetWriter<>(absoluteStagingFile, support, codec, blockSize, pageSize, dictPageSize, enableDictionary, - validate, writerVersion, conf); - } - private ParquetProperties.WriterVersion getWriterVersion() { - return ParquetProperties.WriterVersion.fromString( - this.destination.getProperties().getProp(getProperty(WRITER_PARQUET_VERSION), DEFAULT_PARQUET_WRITER)); - } + CompressionCodecName codecName = CompressionCodecName.fromConf(writerConfiguration.getCodecName()); + ParquetProperties.WriterVersion writerVersion = ParquetProperties.WriterVersion + .fromString(writerConfiguration.getWriterVersion()); - private CompressionCodecName getCodecFromConfig() { - State state = this.destination.getProperties(); - String codecValue = Optional.ofNullable(state.getProp(getProperty(WRITER_CODEC_TYPE))) - .orElse(CompressionCodecName.SNAPPY.toString()); - return CompressionCodecName.valueOf(codecValue.toUpperCase()); - } + Configuration conf = new Configuration(); + ParquetWriter versionSpecificWriter = null; + switch (writerConfiguration.getRecordFormat()) { + case GROUP: { + GroupWriteSupport.setSchema((MessageType) this.schema, conf); + WriteSupport support = new GroupWriteSupport(); + versionSpecificWriter = new ParquetWriter<Group>( + writerConfiguration.getAbsoluteStagingFile(), + support, + codecName, + writerConfiguration.getBlockSize(), + writerConfiguration.getPageSize(), + writerConfiguration.getDictPageSize(), + writerConfiguration.isDictionaryEnabled(), + writerConfiguration.isValidate(), + writerVersion, + conf); + break; + } + case AVRO: { + versionSpecificWriter = new AvroParquetWriter( + writerConfiguration.getAbsoluteStagingFile(), + (Schema) this.schema, + codecName, + writerConfiguration.getBlockSize(), + writerConfiguration.getPageSize(), + writerConfiguration.isDictionaryEnabled(), + conf); + break; + } + case PROTOBUF: { + versionSpecificWriter = new ProtoParquetWriter( + writerConfiguration.getAbsoluteStagingFile(), + (Class<? extends Message>) this.schema, + codecName, + writerConfiguration.getBlockSize(), + writerConfiguration.getPageSize(), + writerConfiguration.isDictionaryEnabled(), + writerConfiguration.isValidate()); + break; + } + default: throw new RuntimeException("Record format not supported"); + } + ParquetWriter finalVersionSpecificWriter = versionSpecificWriter; + + return new ParquetWriterShim() { + @Override + public void write(Object record) + throws IOException { + finalVersionSpecificWriter.write(record); + } - private String getProperty(String key) { - return ForkOperatorUtils.getPropertyNameForBranch(key, this.getBranches(), this.getBranch()); + @Override + public void close() + throws IOException { + finalVersionSpecificWriter.close(); + } + }; } } diff --git a/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java b/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java deleted file mode 100644 index 8a2fc9e..0000000 --- a/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gobblin.writer; - -import java.io.IOException; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.parquet.example.data.Group; -import org.apache.parquet.hadoop.ParquetWriter; - -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.configuration.State; - - -/** - * An extension to {@link FsDataWriter} that writes in Parquet format in the form of {@link Group}s. - * - * <p> - * This implementation allows users to specify the {@link parquet.hadoop.CodecFactory} to use through the configuration - * property {@link ConfigurationKeys#WRITER_CODEC_TYPE}. By default, the deflate codec is used. - * </p> - * - * @author tilakpatidar - */ -public class ParquetHdfsDataWriter extends FsDataWriter<Group> { - private final ParquetWriter<Group> writer; - protected final AtomicLong count = new AtomicLong(0); - - public ParquetHdfsDataWriter(ParquetDataWriterBuilder builder, State state) - throws IOException { - super(builder, state); - this.writer = builder.getWriter((int) this.blockSize, this.stagingFile); - } - - @Override - public void write(Group record) - throws IOException { - this.writer.write(record); - this.count.incrementAndGet(); - } - - @Override - public long recordsWritten() { - return this.count.get(); - } - - @Override - public void close() - throws IOException { - try { - this.writer.close(); - } finally { - super.close(); - } - } -} diff --git a/gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java b/gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java index 46b41d7..fe3a51a 100644 --- a/gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java +++ b/gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java @@ -21,91 +21,117 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; -import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - +import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.simple.convert.GroupRecordConverter; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.api.InitContext; import org.apache.parquet.hadoop.api.ReadSupport; import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.proto.ProtoParquetReader; import org.apache.parquet.schema.MessageType; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.configuration.State; +import lombok.extern.slf4j.Slf4j; -import static org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_DICTIONARY; -import static org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_DICTIONARY_PAGE_SIZE; -import static org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_PAGE_SIZE; -import static org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_VALIDATE; +import org.apache.gobblin.parquet.writer.ParquetRecordFormat; +import org.apache.gobblin.parquet.writer.test.ParquetHdfsDataWriterTestBase; +import org.apache.gobblin.test.TestRecord; +import org.apache.gobblin.test.proto.TestRecordProtos; @Test(groups = {"gobblin.writer"}) -public class ParquetHdfsDataWriterTest { +@Slf4j +public class ParquetHdfsDataWriterTest extends ParquetHdfsDataWriterTestBase { - private MessageType schema; - private String filePath; - private ParquetHdfsDataWriter writer; - private State properties; + + public ParquetHdfsDataWriterTest() { + super(new TestConstants()); + } + + @Override + protected DataWriterBuilder getDataWriterBuilder() { + return new ParquetDataWriterBuilder(); + } @BeforeMethod public void setUp() throws Exception { - // Making the staging and/or output dirs if necessary - File stagingDir = new File(TestConstants.TEST_STAGING_DIR); - File outputDir = new File(TestConstants.TEST_OUTPUT_DIR); - if (!stagingDir.exists()) { - boolean mkdirs = stagingDir.mkdirs(); - assert mkdirs; - } - if (!outputDir.exists()) { - boolean mkdirs = outputDir.mkdirs(); - assert mkdirs; - } - this.schema = TestConstants.PARQUET_SCHEMA; - this.filePath = getFilePath(); - this.properties = createStateWithConfig(); - this.writer = (ParquetHdfsDataWriter) getParquetDataWriterBuilder().build(); + super.setUp(); } - private String getFilePath() { - return TestConstants.TEST_EXTRACT_NAMESPACE.replaceAll("\\.", "/") + "/" + TestConstants.TEST_EXTRACT_TABLE + "/" - + TestConstants.TEST_EXTRACT_ID + "_" + TestConstants.TEST_EXTRACT_PULL_TYPE; + @Override + protected List<TestRecord> readParquetRecordsFromFile(File outputFile, ParquetRecordFormat format) + throws IOException { + switch (format) { + case GROUP: { + return readParquetFilesGroup(outputFile); + } + case PROTOBUF: { + return readParquetFilesProto(outputFile); + } + case AVRO: { + return readParquetFilesAvro(outputFile); + } + default: throw new RuntimeException(format + " is not supported"); + } } - private State createStateWithConfig() { - State properties = new State(); - properties.setProp(ConfigurationKeys.WRITER_BUFFER_SIZE, ConfigurationKeys.DEFAULT_BUFFER_SIZE); - properties.setProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, TestConstants.TEST_FS_URI); - properties.setProp(ConfigurationKeys.WRITER_STAGING_DIR, TestConstants.TEST_STAGING_DIR); - properties.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, TestConstants.TEST_OUTPUT_DIR); - properties.setProp(ConfigurationKeys.WRITER_FILE_PATH, this.filePath); - properties.setProp(ConfigurationKeys.WRITER_FILE_NAME, TestConstants.PARQUET_TEST_FILENAME); - properties.setProp(WRITER_PARQUET_DICTIONARY, true); - properties.setProp(WRITER_PARQUET_DICTIONARY_PAGE_SIZE, 1024); - properties.setProp(WRITER_PARQUET_PAGE_SIZE, 1024); - properties.setProp(WRITER_PARQUET_VALIDATE, true); - properties.setProp(ConfigurationKeys.WRITER_CODEC_TYPE, "gzip"); - return properties; + private List<TestRecord> readParquetFilesAvro(File outputFile) + throws IOException { + ParquetReader<org.apache.gobblin.test.avro.TestRecord> reader = null; + List<TestRecord> records = new ArrayList<>(); + try { + reader = new AvroParquetReader<>(new Path(outputFile.toString())); + for (org.apache.gobblin.test.avro.TestRecord value = reader.read(); value != null; value = reader.read()) { + records.add(new TestRecord(value.getPartition(), + value.getSequence(), + value.getPayload())); + } + } finally { + if (reader != null) { + try { + reader.close(); + } catch (Exception ex) { + System.out.println(ex.getMessage()); + } + } + } + return records; + } - private ParquetDataWriterBuilder getParquetDataWriterBuilder() { - ParquetDataWriterBuilder writerBuilder = new ParquetDataWriterBuilder(); - writerBuilder.destination = Destination.of(Destination.DestinationType.HDFS, properties); - writerBuilder.writerId = TestConstants.TEST_WRITER_ID; - writerBuilder.schema = this.schema; - writerBuilder.format = WriterOutputFormat.PARQUET; - return writerBuilder; + protected List<TestRecord> readParquetFilesProto(File outputFile) + throws IOException { + ParquetReader<TestRecordProtos.TestRecordOrBuilder> reader = null; + List<TestRecord> records = new ArrayList<>(); + try { + reader = new ProtoParquetReader<>(new Path(outputFile.toString())); + for (TestRecordProtos.TestRecordOrBuilder value = reader.read(); value != null; value = reader.read()) { + records.add(new TestRecord(value.getPartition(), + value.getSequence(), + value.getPayload())); + } + } finally { + if (reader != null) { + try { + reader.close(); + } catch (Exception ex) { + System.out.println(ex.getMessage()); + } + } + } + return records; } - private List<Group> readParquetFiles(File outputFile) + + protected List<TestRecord> readParquetFilesGroup(File outputFile) throws IOException { ParquetReader<Group> reader = null; List<Group> records = new ArrayList<>(); @@ -123,46 +149,40 @@ public class ParquetHdfsDataWriterTest { } } } - return records; + return records.stream().map(value -> new TestRecord( + value.getInteger(TestConstants.PARTITION_FIELD_NAME, 0), + value.getLong(TestConstants.SEQUENCE_FIELD_NAME, 0), + value.getString(TestConstants.PAYLOAD_FIELD_NAME, 0) + )).collect(Collectors.toList()); } @Test public void testWrite() throws Exception { - long firstWrite; - long secondWrite; - List<Group> records; - Group record1 = TestConstants.PARQUET_RECORD_1; - Group record2 = TestConstants.PARQUET_RECORD_2; - String filePath = TestConstants.TEST_OUTPUT_DIR + Path.SEPARATOR + this.filePath; - File outputFile = new File(filePath, TestConstants.PARQUET_TEST_FILENAME); - - this.writer.write(record1); - firstWrite = this.writer.recordsWritten(); - this.writer.write(record2); - secondWrite = this.writer.recordsWritten(); - this.writer.close(); - this.writer.commit(); - records = readParquetFiles(outputFile); - Group resultRecord1 = records.get(0); - Group resultRecord2 = records.get(1); - - Assert.assertEquals(firstWrite, 1); - Assert.assertEquals(secondWrite, 2); - Assert.assertEquals(resultRecord1.getString("name", 0), "tilak"); - Assert.assertEquals(resultRecord1.getInteger("age", 0), 22); - Assert.assertEquals(resultRecord2.getString("name", 0), "other"); - Assert.assertEquals(resultRecord2.getInteger("age", 0), 22); + super.testWrite(); + } + + @Override + protected Object getSchema(ParquetRecordFormat format) { + switch (format) { + case GROUP: { + return TestConstants.PARQUET_SCHEMA; + } + case PROTOBUF: { + return TestRecordProtos.TestRecord.class; + } + case AVRO: { + return org.apache.gobblin.test.avro.TestRecord.getClassSchema(); + } + default: + throw new RuntimeException(format.name() + " is not implemented"); + } } @AfterClass public void tearDown() throws IOException { - // Clean up the staging and/or output directories if necessary - File testRootDir = new File(TestConstants.TEST_ROOT_DIR); - if (testRootDir.exists()) { - FileUtil.fullyDelete(testRootDir); - } + super.tearDown(); } class SimpleReadSupport extends ReadSupport<Group> { diff --git a/gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/writer/TestConstants.java b/gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/writer/TestConstants.java index 41d7ee3..9b7f7f6 100644 --- a/gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/writer/TestConstants.java +++ b/gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/writer/TestConstants.java @@ -23,40 +23,27 @@ import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Types; +import org.apache.gobblin.parquet.writer.test.TestConstantsBase; +import org.apache.gobblin.test.TestRecord; -public class TestConstants { - public static final MessageType PARQUET_SCHEMA = Types.buildMessage() - .addFields(Types.required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("name"), - Types.optional(PrimitiveType.PrimitiveTypeName.INT32).named("age")).named("User"); - - public static final Group PARQUET_RECORD_1 = new SimpleGroup(PARQUET_SCHEMA); - - public static final Group PARQUET_RECORD_2 = new SimpleGroup(PARQUET_SCHEMA); - - public static final String PARQUET_TEST_FILENAME = "test.parquet"; - - public static final String TEST_FS_URI = "file:///"; - - public static final String TEST_ROOT_DIR = System.getProperty("java.io.tmpdir") + "/" + System.currentTimeMillis(); - - public static final String TEST_STAGING_DIR = TEST_ROOT_DIR + "/staging"; - public static final String TEST_OUTPUT_DIR = TEST_ROOT_DIR + "/output"; +public class TestConstants extends TestConstantsBase<Group> { - public static final String TEST_WRITER_ID = "writer-1"; - - public static final String TEST_EXTRACT_NAMESPACE = "com.linkedin.writer.test"; - - public static final String TEST_EXTRACT_ID = String.valueOf(System.currentTimeMillis()); - - public static final String TEST_EXTRACT_TABLE = "TestTable"; - - public static final String TEST_EXTRACT_PULL_TYPE = "FULL"; - - static { - PARQUET_RECORD_1.add("name", "tilak"); - PARQUET_RECORD_1.add("age", 22); - PARQUET_RECORD_2.add("name", "other"); - PARQUET_RECORD_2.add("age", 22); + public static final MessageType PARQUET_SCHEMA = Types.buildMessage() + .addFields( + Types.required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8) + .named(TestConstants.PAYLOAD_FIELD_NAME), + Types.required(PrimitiveType.PrimitiveTypeName.INT32).named(TestConstants.PARTITION_FIELD_NAME), + Types.required(PrimitiveType.PrimitiveTypeName.INT64).named(TestConstants.SEQUENCE_FIELD_NAME)) + .named("Data"); + + @Override + public Group convertToParquetGroup(TestRecord record) { + Group group = new SimpleGroup(PARQUET_SCHEMA); + group.add(PAYLOAD_FIELD_NAME, record.getPayload()); + group.add(SEQUENCE_FIELD_NAME, Long.valueOf(record.getSequence())); + group.add(PARTITION_FIELD_NAME, record.getPartition()); + return group; } + } diff --git a/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/AbstractParquetDataWriterBuilder.java b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/AbstractParquetDataWriterBuilder.java new file mode 100644 index 0000000..0ed3965 --- /dev/null +++ b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/AbstractParquetDataWriterBuilder.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.parquet.writer; + +import java.io.IOException; + +import org.apache.hadoop.fs.Path; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.writer.DataWriter; +import org.apache.gobblin.writer.FsDataWriterBuilder; +import org.apache.gobblin.writer.WriterOutputFormat; + + +@Slf4j +public abstract class AbstractParquetDataWriterBuilder<S,D> extends FsDataWriterBuilder<S, D> { + + @Override + public DataWriter<D> build() + throws IOException { + Preconditions.checkNotNull(this.destination); + Preconditions.checkArgument(!Strings.isNullOrEmpty(this.writerId)); + Preconditions.checkNotNull(this.schema); + Preconditions.checkArgument(this.format == WriterOutputFormat.PARQUET); + + switch (this.destination.getType()) { + case HDFS: + return new ParquetHdfsDataWriter<D>(this, this.destination.getProperties()); + default: + throw new RuntimeException("Unknown destination type: " + this.destination.getType()); + } + } + + protected abstract ParquetWriterShim getVersionSpecificWriter(ParquetWriterConfiguration writerConfiguration) + throws IOException; + + /** + * Build a {@link ParquetWriterShim <D>} for given file path with a block size. + * @param blockSize + * @param stagingFile + * @return + * @throws IOException + */ + public ParquetWriterShim<D> getWriter(int blockSize, Path stagingFile) + throws IOException { + State state = this.destination.getProperties(); + ParquetWriterConfiguration writerConfiguration = + new ParquetWriterConfiguration(state, this.getBranches(), this.getBranch(), stagingFile, blockSize); + + log.info("Parquet writer configured with {}", writerConfiguration); + return getVersionSpecificWriter(writerConfiguration); + } + +} diff --git a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetHdfsDataWriter.java similarity index 70% rename from gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java rename to gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetHdfsDataWriter.java index 744c784..47d5624 100644 --- a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java +++ b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetHdfsDataWriter.java @@ -14,40 +14,37 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.gobblin.writer; +package org.apache.gobblin.parquet.writer; import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; -import parquet.example.data.Group; -import parquet.hadoop.ParquetWriter; - -import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; +import org.apache.gobblin.writer.FsDataWriter; /** - * An extension to {@link FsDataWriter} that writes in Parquet format in the form of {@link Group}s. + * An extension to {@link FsDataWriter} that writes in Parquet formats. * * <p> - * This implementation allows users to specify the {@link parquet.hadoop.CodecFactory} to use through the configuration - * property {@link ConfigurationKeys#WRITER_CODEC_TYPE}. By default, the deflate codec is used. + * This implementation allows users to specify different formats and codecs + * through {@link ParquetWriterConfiguration} to write data. * </p> * * @author tilakpatidar */ -public class ParquetHdfsDataWriter extends FsDataWriter<Group> { - private final ParquetWriter<Group> writer; +public class ParquetHdfsDataWriter<D> extends FsDataWriter<D> { + private final ParquetWriterShim writer; protected final AtomicLong count = new AtomicLong(0); - public ParquetHdfsDataWriter(ParquetDataWriterBuilder builder, State state) + public ParquetHdfsDataWriter(AbstractParquetDataWriterBuilder builder, State state) throws IOException { super(builder, state); this.writer = builder.getWriter((int) this.blockSize, this.stagingFile); } @Override - public void write(Group record) + public void write(D record) throws IOException { this.writer.write(record); this.count.incrementAndGet(); diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/test/TestRecord.java b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetRecordFormat.java similarity index 75% copy from gobblin-core-base/src/main/java/org/apache/gobblin/test/TestRecord.java copy to gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetRecordFormat.java index eb07977..4403f03 100644 --- a/gobblin-core-base/src/main/java/org/apache/gobblin/test/TestRecord.java +++ b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetRecordFormat.java @@ -14,21 +14,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.gobblin.test; - -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.ToString; - +package org.apache.gobblin.parquet.writer; /** - * A Test record + * Enum to hold the supported values for formats supported by the Parquet writer + * @see {@link ParquetWriterConfiguration} for configuration keys to set them */ -@Getter -@ToString -@AllArgsConstructor -public class TestRecord { - private int partition; - private long sequence; - private String payload; +public enum ParquetRecordFormat { + GROUP, + AVRO, + PROTOBUF; + } diff --git a/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetWriterConfiguration.java b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetWriterConfiguration.java new file mode 100644 index 0000000..aeb426f --- /dev/null +++ b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetWriterConfiguration.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.parquet.writer; + +import org.apache.hadoop.fs.Path; + +import com.typesafe.config.Config; + +import lombok.Getter; +import lombok.ToString; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.ForkOperatorUtils; + +import static org.apache.gobblin.configuration.ConfigurationKeys.LOCAL_FS_URI; +import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_CODEC_TYPE; +import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_FILE_SYSTEM_URI; +import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_PREFIX; + + +/** + * Holds configuration for the {@link ParquetHdfsDataWriter} + */ +@Getter @ToString +public class ParquetWriterConfiguration { + public static final String WRITER_PARQUET_PAGE_SIZE = WRITER_PREFIX + ".parquet.pageSize"; + public static final String WRITER_PARQUET_DICTIONARY_PAGE_SIZE = WRITER_PREFIX + ".parquet.dictionaryPageSize"; + public static final String WRITER_PARQUET_DICTIONARY = WRITER_PREFIX + ".parquet.dictionary"; + public static final String WRITER_PARQUET_VALIDATE = WRITER_PREFIX + ".parquet.validate"; + public static final String WRITER_PARQUET_VERSION = WRITER_PREFIX + ".parquet.version"; + public static final String DEFAULT_PARQUET_WRITER = "v1"; + public static final String WRITER_PARQUET_FORMAT = WRITER_PREFIX + ".parquet.format"; + public static final String DEFAULT_PARQUET_FORMAT = "group"; + + + + public static final int DEFAULT_BLOCK_SIZE = 128 * 1024 * 1024; + public static final int DEFAULT_PAGE_SIZE = 1 * 1024 * 1024; + public static final String DEFAULT_COMPRESSION_CODEC_NAME = "UNCOMPRESSED"; + public static final String[] ALLOWED_COMPRESSION_CODECS = {"SNAPPY", "LZO", "UNCOMPRESSED", "GZIP"}; + public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true; + public static final boolean DEFAULT_IS_VALIDATING_ENABLED = false; + public static final String DEFAULT_WRITER_VERSION = "v1"; + public static final String[] ALLOWED_WRITER_VERSIONS = {"v1", "v2"}; + + + private final int pageSize; + private final int dictPageSize; + private final boolean dictionaryEnabled; + private final boolean validate; + private final String writerVersion; + private final ParquetRecordFormat recordFormat; + + + private final int numBranches; + private final int branchId; + private final String codecName; + private final Path absoluteStagingFile; + private final int blockSize; + + public ParquetWriterConfiguration(State state, int numBranches, int branchId, Path stagingFile, int blockSize) { + this(ConfigUtils.propertiesToConfig(state.getProperties()), numBranches, branchId, stagingFile, blockSize); + } + + + private String getProperty(String key) { + return ForkOperatorUtils.getPropertyNameForBranch(key, numBranches, branchId); + } + + public static ParquetRecordFormat getRecordFormatFromConfig(Config config) { + String writeSupport = ConfigUtils.getString(config, WRITER_PARQUET_FORMAT, DEFAULT_PARQUET_FORMAT); + ParquetRecordFormat recordFormat = ParquetRecordFormat.valueOf(writeSupport.toUpperCase()); + return recordFormat; + } + + + ParquetWriterConfiguration(Config config, int numBranches, int branchId, Path stagingFile, int blockSize) { + this.numBranches = numBranches; + this.branchId = branchId; + this.pageSize = ConfigUtils.getInt(config, getProperty(WRITER_PARQUET_PAGE_SIZE), DEFAULT_PAGE_SIZE); + this.dictPageSize = ConfigUtils.getInt(config, getProperty(WRITER_PARQUET_DICTIONARY_PAGE_SIZE), DEFAULT_BLOCK_SIZE); + this.dictionaryEnabled = + ConfigUtils.getBoolean(config, getProperty(WRITER_PARQUET_DICTIONARY), DEFAULT_IS_DICTIONARY_ENABLED); + this.validate = ConfigUtils.getBoolean(config, getProperty(WRITER_PARQUET_VALIDATE), DEFAULT_IS_VALIDATING_ENABLED); + String rootURI = ConfigUtils.getString(config, WRITER_FILE_SYSTEM_URI, LOCAL_FS_URI); + this.absoluteStagingFile = new Path(rootURI, stagingFile); + this.codecName = ConfigUtils.getString(config,getProperty(WRITER_CODEC_TYPE), DEFAULT_COMPRESSION_CODEC_NAME); + this.recordFormat = getRecordFormatFromConfig(config); + this.writerVersion = ConfigUtils.getString(config, getProperty(WRITER_PARQUET_VERSION), DEFAULT_WRITER_VERSION); + this.blockSize = blockSize; + } +} diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/test/TestRecord.java b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetWriterShim.java similarity index 71% copy from gobblin-core-base/src/main/java/org/apache/gobblin/test/TestRecord.java copy to gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetWriterShim.java index eb07977..9e64fda 100644 --- a/gobblin-core-base/src/main/java/org/apache/gobblin/test/TestRecord.java +++ b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetWriterShim.java @@ -14,21 +14,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.gobblin.test; +package org.apache.gobblin.parquet.writer; -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.ToString; +import java.io.Closeable; +import java.io.IOException; /** - * A Test record + * An interface to shield gobblin-parquet-common integration from different parquet version specific interfaces + * @param <D> */ -@Getter -@ToString -@AllArgsConstructor -public class TestRecord { - private int partition; - private long sequence; - private String payload; +public interface ParquetWriterShim<D> extends Closeable { + void write(D record) + throws IOException; } diff --git a/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/test/ParquetHdfsDataWriterTestBase.java b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/test/ParquetHdfsDataWriterTestBase.java new file mode 100644 index 0000000..10fc1a8 --- /dev/null +++ b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/test/ParquetHdfsDataWriterTestBase.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.parquet.writer.test; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; + +import junit.framework.Assert; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.parquet.writer.ParquetRecordFormat; +import org.apache.gobblin.parquet.writer.ParquetWriterConfiguration; +import org.apache.gobblin.test.TestRecord; +import org.apache.gobblin.writer.DataWriter; +import org.apache.gobblin.writer.DataWriterBuilder; +import org.apache.gobblin.writer.Destination; +import org.apache.gobblin.writer.WriterOutputFormat; + + +/** + * Base class for building version-specific tests for Parquet + */ +@Slf4j +public abstract class ParquetHdfsDataWriterTestBase { + + public ParquetHdfsDataWriterTestBase(TestConstantsBase testConstants) + { + this.testConstants = testConstants; + } + + private final TestConstantsBase testConstants; + private String filePath; + private DataWriter writer; + + protected abstract DataWriterBuilder getDataWriterBuilder(); + + public void setUp() + throws Exception { + // Making the staging and/or output dirs if necessary + File stagingDir = new File(this.testConstants.TEST_STAGING_DIR); + File outputDir = new File(this.testConstants.TEST_OUTPUT_DIR); + if (!stagingDir.exists()) { + boolean mkdirs = stagingDir.mkdirs(); + assert mkdirs; + } + if (!outputDir.exists()) { + boolean mkdirs = outputDir.mkdirs(); + assert mkdirs; + } + this.filePath = getFilePath(); + } + + private String getFilePath() { + return TestConstantsBase.TEST_EXTRACT_NAMESPACE.replaceAll("\\.", "/") + "/" + TestConstantsBase.TEST_EXTRACT_TABLE + "/" + + TestConstantsBase.TEST_EXTRACT_ID + "_" + TestConstantsBase.TEST_EXTRACT_PULL_TYPE; + } + + private State createStateWithConfig(ParquetRecordFormat format) { + State properties = new State(); + properties.setProp(ConfigurationKeys.WRITER_BUFFER_SIZE, ConfigurationKeys.DEFAULT_BUFFER_SIZE); + properties.setProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, TestConstantsBase.TEST_FS_URI); + properties.setProp(ConfigurationKeys.WRITER_STAGING_DIR, TestConstantsBase.TEST_STAGING_DIR); + properties.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, TestConstantsBase.TEST_OUTPUT_DIR); + properties.setProp(ConfigurationKeys.WRITER_FILE_PATH, this.filePath); + properties.setProp(ConfigurationKeys.WRITER_FILE_NAME, this.testConstants.getParquetTestFilename(format.name())); + properties.setProp(ParquetWriterConfiguration.WRITER_PARQUET_DICTIONARY, true); + properties.setProp(ParquetWriterConfiguration.WRITER_PARQUET_DICTIONARY_PAGE_SIZE, 1024); + properties.setProp(ParquetWriterConfiguration.WRITER_PARQUET_PAGE_SIZE, 1024); + properties.setProp(ParquetWriterConfiguration.WRITER_PARQUET_VALIDATE, true); + properties.setProp(ParquetWriterConfiguration.WRITER_PARQUET_FORMAT, format.toString()); + properties.setProp(ConfigurationKeys.WRITER_CODEC_TYPE, "gzip"); + return properties; + } + + + protected abstract List<TestRecord> readParquetRecordsFromFile(File outputFile, ParquetRecordFormat format) throws IOException; + + + public void testWrite() + throws Exception { + ParquetRecordFormat[] formats = ParquetRecordFormat.values(); + for (ParquetRecordFormat format : formats) { + State formatSpecificProperties = createStateWithConfig(format); + + this.writer = getDataWriterBuilder() + .writeTo(Destination.of(Destination.DestinationType.HDFS, formatSpecificProperties)) + .withWriterId(TestConstantsBase.TEST_WRITER_ID) + .writeInFormat(WriterOutputFormat.PARQUET) + .withSchema(getSchema(format)) + .build(); + + for (int i=0; i < 2; ++i) { + Object record = this.testConstants.getRecord(i, format); + this.writer.write(record); + Assert.assertEquals(i+1, this.writer.recordsWritten()); + } + this.writer.close(); + this.writer.commit(); + + String filePath = TestConstantsBase.TEST_OUTPUT_DIR + Path.SEPARATOR + this.filePath; + File outputFile = new File(filePath, this.testConstants.getParquetTestFilename(format.name())); + + List<TestRecord> records = readParquetRecordsFromFile(outputFile, format); + for (int i = 0; i < 2; ++i) { + TestRecord resultRecord = records.get(i); + log.debug("Testing {} record {}", i, resultRecord); + Assert.assertEquals(TestConstantsBase.getPayloadValues()[i], resultRecord.getPayload()); + Assert.assertEquals(TestConstantsBase.getSequenceValues()[i], resultRecord.getSequence()); + Assert.assertEquals(TestConstantsBase.getPartitionValues()[i], resultRecord.getPartition()); + } + } + } + + protected abstract Object getSchema(ParquetRecordFormat format); + + public void tearDown() + throws IOException { + // Clean up the staging and/or output directories if necessary + File testRootDir = new File(TestConstantsBase.TEST_ROOT_DIR); + if (testRootDir.exists()) { + FileUtil.fullyDelete(testRootDir); + } + } + +} diff --git a/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/test/TestConstantsBase.java b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/test/TestConstantsBase.java new file mode 100644 index 0000000..3326022 --- /dev/null +++ b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/test/TestConstantsBase.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.parquet.writer.test; + +import java.util.Arrays; + +import org.apache.avro.generic.GenericRecord; + +import com.google.protobuf.Message; + +import org.apache.gobblin.parquet.writer.ParquetRecordFormat; +import org.apache.gobblin.test.TestRecord; +import org.apache.gobblin.test.proto.TestRecordProtos; + + +/** + * Holder for TestConstantsBase + * @param <ParquetGroup> : the class that implements ParquetGroup, generic to allow package-specific overrides + */ +public abstract class TestConstantsBase<ParquetGroup> { + + public static TestRecord[] getTestValues() { + return Arrays.copyOf(TEST_VALUES, TEST_VALUES.length); + } + + public static String[] getPayloadValues() { + return Arrays.copyOf(PAYLOAD_VALUES, PAYLOAD_VALUES.length); + } + + public static int[] getSequenceValues() { + return Arrays.copyOf(SEQUENCE_VALUES, SEQUENCE_VALUES.length); + } + + public static int[] getPartitionValues() { + return Arrays.copyOf(PARTITION_VALUES, PARTITION_VALUES.length); + } + + public final String getParquetTestFilename(String format) { return "test-"+format+".parquet"; }; + + public static final String TEST_FS_URI = "file:///"; + + public static final String TEST_ROOT_DIR = System.getProperty("java.io.tmpdir") + "/" + System.currentTimeMillis(); + + public static final String TEST_STAGING_DIR = TEST_ROOT_DIR + "/staging"; + + public static final String TEST_OUTPUT_DIR = TEST_ROOT_DIR + "/output"; + + public static final String TEST_WRITER_ID = "writer-1"; + + public static final String TEST_EXTRACT_NAMESPACE = "com.linkedin.writer.test"; + + public static final String TEST_EXTRACT_ID = String.valueOf(System.currentTimeMillis()); + + public static final String TEST_EXTRACT_TABLE = "TestTable"; + + public static final String TEST_EXTRACT_PULL_TYPE = "FULL"; + + + private static final TestRecord[] TEST_VALUES = new TestRecord[2]; + public static final String PAYLOAD_FIELD_NAME = "payload"; + public static final String SEQUENCE_FIELD_NAME = "sequence"; + public static final String PARTITION_FIELD_NAME = "partition"; + private static final String[] PAYLOAD_VALUES = {"value1", "value2"}; + private static final int[] SEQUENCE_VALUES = {1, 2}; + private static final int[] PARTITION_VALUES = {0, 1}; + static { + for (int i=0; i < 2; ++i) { + TestRecord record = new TestRecord(getPartitionValues()[i], + getSequenceValues()[i], + getPayloadValues()[i]); + TEST_VALUES[i] = record; + } + } + + + public Object getRecord(int index, ParquetRecordFormat format) { + switch (format) { + case GROUP: { + return convertToParquetGroup(getTestValues()[index]); + } + case PROTOBUF: { + return getProtobufMessage(getTestValues()[index]); + } + case AVRO: { + return getAvroMessage(getTestValues()[index]); + } + default: { + throw new RuntimeException("Not understanding format " + format); + } + } + } + + + public abstract ParquetGroup convertToParquetGroup(TestRecord record); + + + private Message getProtobufMessage(TestRecord testValue) { + return TestRecordProtos.TestRecord.newBuilder() + .setPayload(testValue.getPayload()) + .setPartition(testValue.getPartition()) + .setSequence(testValue.getSequence()) + .build(); + } + + private GenericRecord getAvroMessage(TestRecord record) { + org.apache.gobblin.test.avro.TestRecord testRecord = new org.apache.gobblin.test.avro.TestRecord(); + testRecord.setPayload(record.getPayload()); + testRecord.setPartition(record.getPartition()); + testRecord.setSequence(record.getSequence()); + return testRecord; + } + + + +} diff --git a/gobblin-modules/gobblin-parquet/build.gradle b/gobblin-modules/gobblin-parquet/build.gradle index fb56bb1..cefd633 100644 --- a/gobblin-modules/gobblin-parquet/build.gradle +++ b/gobblin-modules/gobblin-parquet/build.gradle @@ -23,6 +23,8 @@ dependencies { compile externalDependency.gson compile externalDependency.twitterParquet + compile externalDependency.twitterParquetAvro + compile externalDependency.twitterParquetProto testCompile externalDependency.testng testCompile externalDependency.mockito diff --git a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java index 4a47792..a96e079 100644 --- a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java +++ b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java @@ -17,97 +17,101 @@ package org.apache.gobblin.writer; import java.io.IOException; -import java.util.Optional; +import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; +import com.google.protobuf.Message; +import lombok.extern.slf4j.Slf4j; +import parquet.avro.AvroParquetWriter; import parquet.column.ParquetProperties; import parquet.example.data.Group; import parquet.hadoop.ParquetWriter; +import parquet.hadoop.api.WriteSupport; import parquet.hadoop.example.GroupWriteSupport; import parquet.hadoop.metadata.CompressionCodecName; +import parquet.proto.ProtoParquetWriter; import parquet.schema.MessageType; -import org.apache.gobblin.configuration.State; -import org.apache.gobblin.util.ForkOperatorUtils; +import org.apache.gobblin.parquet.writer.AbstractParquetDataWriterBuilder; +import org.apache.gobblin.parquet.writer.ParquetWriterConfiguration; +import org.apache.gobblin.parquet.writer.ParquetWriterShim; -import static org.apache.gobblin.configuration.ConfigurationKeys.LOCAL_FS_URI; -import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_CODEC_TYPE; -import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_FILE_SYSTEM_URI; -import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_PREFIX; -import static parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE; -import static parquet.hadoop.ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED; -import static parquet.hadoop.ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED; -import static parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE; - - -public class ParquetDataWriterBuilder extends FsDataWriterBuilder<MessageType, Group> { - public static final String WRITER_PARQUET_PAGE_SIZE = WRITER_PREFIX + ".parquet.pageSize"; - public static final String WRITER_PARQUET_DICTIONARY_PAGE_SIZE = WRITER_PREFIX + ".parquet.dictionaryPageSize"; - public static final String WRITER_PARQUET_DICTIONARY = WRITER_PREFIX + ".parquet.dictionary"; - public static final String WRITER_PARQUET_VALIDATE = WRITER_PREFIX + ".parquet.validate"; - public static final String WRITER_PARQUET_VERSION = WRITER_PREFIX + ".parquet.version"; - public static final String DEFAULT_PARQUET_WRITER = "v1"; - - @Override - public DataWriter<Group> build() - throws IOException { - Preconditions.checkNotNull(this.destination); - Preconditions.checkArgument(!Strings.isNullOrEmpty(this.writerId)); - Preconditions.checkNotNull(this.schema); - Preconditions.checkArgument(this.format == WriterOutputFormat.PARQUET); - - switch (this.destination.getType()) { - case HDFS: - return new ParquetHdfsDataWriter(this, this.destination.getProperties()); - default: - throw new RuntimeException("Unknown destination type: " + this.destination.getType()); - } - } +@Slf4j +public class ParquetDataWriterBuilder<S,D> extends AbstractParquetDataWriterBuilder<S,D> { /** - * Build a {@link ParquetWriter<Group>} for given file path with a block size. - * @param blockSize - * @param stagingFile + * Build a version-specific {@link ParquetWriter} for given {@link ParquetWriterConfiguration} + * @param writerConfiguration * @return * @throws IOException */ - public ParquetWriter<Group> getWriter(int blockSize, Path stagingFile) + @Override + public ParquetWriterShim getVersionSpecificWriter(ParquetWriterConfiguration writerConfiguration) throws IOException { - State state = this.destination.getProperties(); - int pageSize = state.getPropAsInt(getProperty(WRITER_PARQUET_PAGE_SIZE), DEFAULT_PAGE_SIZE); - int dictPageSize = state.getPropAsInt(getProperty(WRITER_PARQUET_DICTIONARY_PAGE_SIZE), DEFAULT_BLOCK_SIZE); - boolean enableDictionary = - state.getPropAsBoolean(getProperty(WRITER_PARQUET_DICTIONARY), DEFAULT_IS_DICTIONARY_ENABLED); - boolean validate = state.getPropAsBoolean(getProperty(WRITER_PARQUET_VALIDATE), DEFAULT_IS_VALIDATING_ENABLED); - String rootURI = state.getProp(WRITER_FILE_SYSTEM_URI, LOCAL_FS_URI); - Path absoluteStagingFile = new Path(rootURI, stagingFile); - CompressionCodecName codec = getCodecFromConfig(); - GroupWriteSupport support = new GroupWriteSupport(); - Configuration conf = new Configuration(); - GroupWriteSupport.setSchema(this.schema, conf); - ParquetProperties.WriterVersion writerVersion = getWriterVersion(); - return new ParquetWriter<>(absoluteStagingFile, support, codec, blockSize, pageSize, dictPageSize, enableDictionary, - validate, writerVersion, conf); - } - private ParquetProperties.WriterVersion getWriterVersion() { - return ParquetProperties.WriterVersion.fromString( - this.destination.getProperties().getProp(getProperty(WRITER_PARQUET_VERSION), DEFAULT_PARQUET_WRITER)); - } + CompressionCodecName codecName = CompressionCodecName.fromConf(writerConfiguration.getCodecName()); + ParquetProperties.WriterVersion writerVersion = ParquetProperties.WriterVersion + .fromString(writerConfiguration.getWriterVersion()); - private CompressionCodecName getCodecFromConfig() { - State state = this.destination.getProperties(); - String codecValue = Optional.ofNullable(state.getProp(getProperty(WRITER_CODEC_TYPE))) - .orElse(CompressionCodecName.SNAPPY.toString()); - return CompressionCodecName.valueOf(codecValue.toUpperCase()); - } + Configuration conf = new Configuration(); + ParquetWriter versionSpecificWriter = null; + switch (writerConfiguration.getRecordFormat()) { + case GROUP: { + GroupWriteSupport.setSchema((MessageType) this.schema, conf); + WriteSupport support = new GroupWriteSupport(); + versionSpecificWriter = new ParquetWriter<Group>( + writerConfiguration.getAbsoluteStagingFile(), + support, + codecName, + writerConfiguration.getBlockSize(), + writerConfiguration.getPageSize(), + writerConfiguration.getDictPageSize(), + writerConfiguration.isDictionaryEnabled(), + writerConfiguration.isValidate(), + writerVersion, + conf); + break; + } + case AVRO: { + versionSpecificWriter = new AvroParquetWriter( + writerConfiguration.getAbsoluteStagingFile(), + (Schema) this.schema, + codecName, + writerConfiguration.getBlockSize(), + writerConfiguration.getPageSize(), + writerConfiguration.isDictionaryEnabled(), + conf); + break; + } + case PROTOBUF: { + versionSpecificWriter = new ProtoParquetWriter( + writerConfiguration.getAbsoluteStagingFile(), + (Class<? extends Message>) this.schema, + codecName, + writerConfiguration.getBlockSize(), + writerConfiguration.getPageSize(), + writerConfiguration.isDictionaryEnabled(), + writerConfiguration.isValidate()); + break; + } + default: throw new RuntimeException("Record format not supported"); + } + ParquetWriter finalVersionSpecificWriter = versionSpecificWriter; + + return new ParquetWriterShim() { + @Override + public void write(Object record) + throws IOException { + finalVersionSpecificWriter.write(record); + } - private String getProperty(String key) { - return ForkOperatorUtils.getPropertyNameForBranch(key, this.getBranches(), this.getBranch()); + @Override + public void close() + throws IOException { + finalVersionSpecificWriter.close(); + } + }; } -} +} \ No newline at end of file diff --git a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java index 086b084..0f0aadb 100644 --- a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java +++ b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java @@ -21,91 +21,115 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; -import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import parquet.avro.AvroParquetReader; import parquet.example.data.Group; import parquet.example.data.simple.convert.GroupRecordConverter; import parquet.hadoop.ParquetReader; import parquet.hadoop.api.InitContext; import parquet.hadoop.api.ReadSupport; import parquet.io.api.RecordMaterializer; +import parquet.proto.ProtoParquetReader; import parquet.schema.MessageType; -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.configuration.State; - -import static org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_DICTIONARY; -import static org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_DICTIONARY_PAGE_SIZE; -import static org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_PAGE_SIZE; -import static org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_VALIDATE; +import org.apache.gobblin.parquet.writer.ParquetRecordFormat; +import org.apache.gobblin.parquet.writer.test.ParquetHdfsDataWriterTestBase; +import org.apache.gobblin.test.TestRecord; +import org.apache.gobblin.test.proto.TestRecordProtos; @Test(groups = {"gobblin.writer"}) -public class ParquetHdfsDataWriterTest { +public class ParquetHdfsDataWriterTest extends ParquetHdfsDataWriterTestBase { - private MessageType schema; - private String filePath; - private ParquetHdfsDataWriter writer; - private State properties; + public ParquetHdfsDataWriterTest() { + super(new TestConstants()); + } @BeforeMethod public void setUp() throws Exception { - // Making the staging and/or output dirs if necessary - File stagingDir = new File(TestConstants.TEST_STAGING_DIR); - File outputDir = new File(TestConstants.TEST_OUTPUT_DIR); - if (!stagingDir.exists()) { - boolean mkdirs = stagingDir.mkdirs(); - assert mkdirs; - } - if (!outputDir.exists()) { - boolean mkdirs = outputDir.mkdirs(); - assert mkdirs; - } - this.schema = TestConstants.PARQUET_SCHEMA; - this.filePath = getFilePath(); - this.properties = createStateWithConfig(); - this.writer = (ParquetHdfsDataWriter) getParquetDataWriterBuilder().build(); + super.setUp(); } - private String getFilePath() { - return TestConstants.TEST_EXTRACT_NAMESPACE.replaceAll("\\.", "/") + "/" + TestConstants.TEST_EXTRACT_TABLE + "/" - + TestConstants.TEST_EXTRACT_ID + "_" + TestConstants.TEST_EXTRACT_PULL_TYPE; + protected DataWriterBuilder getDataWriterBuilder() { + return new ParquetDataWriterBuilder(); } - private State createStateWithConfig() { - State properties = new State(); - properties.setProp(ConfigurationKeys.WRITER_BUFFER_SIZE, ConfigurationKeys.DEFAULT_BUFFER_SIZE); - properties.setProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, TestConstants.TEST_FS_URI); - properties.setProp(ConfigurationKeys.WRITER_STAGING_DIR, TestConstants.TEST_STAGING_DIR); - properties.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, TestConstants.TEST_OUTPUT_DIR); - properties.setProp(ConfigurationKeys.WRITER_FILE_PATH, this.filePath); - properties.setProp(ConfigurationKeys.WRITER_FILE_NAME, TestConstants.PARQUET_TEST_FILENAME); - properties.setProp(WRITER_PARQUET_DICTIONARY, true); - properties.setProp(WRITER_PARQUET_DICTIONARY_PAGE_SIZE, 1024); - properties.setProp(WRITER_PARQUET_PAGE_SIZE, 1024); - properties.setProp(WRITER_PARQUET_VALIDATE, true); - properties.setProp(ConfigurationKeys.WRITER_CODEC_TYPE, "gzip"); - return properties; + @Override + protected List<TestRecord> readParquetRecordsFromFile(File outputFile, ParquetRecordFormat format) + throws IOException { + switch (format) { + case GROUP: { + return readParquetFilesGroup(outputFile); + } + case PROTOBUF: { + return readParquetFilesProto(outputFile); + } + case AVRO: { + return readParquetFilesAvro(outputFile); + } + default: throw new RuntimeException(format + " is not supported"); + } } - private ParquetDataWriterBuilder getParquetDataWriterBuilder() { - ParquetDataWriterBuilder writerBuilder = new ParquetDataWriterBuilder(); - writerBuilder.destination = Destination.of(Destination.DestinationType.HDFS, properties); - writerBuilder.writerId = TestConstants.TEST_WRITER_ID; - writerBuilder.schema = this.schema; - writerBuilder.format = WriterOutputFormat.PARQUET; - return writerBuilder; + private List<TestRecord> readParquetFilesAvro(File outputFile) + throws IOException { + ParquetReader<org.apache.gobblin.test.avro.TestRecord> reader = null; + List<TestRecord> records = new ArrayList<>(); + try { + reader = new AvroParquetReader<>(new Path(outputFile.toString())); + for (org.apache.gobblin.test.avro.TestRecord value = reader.read(); value != null; value = reader.read()) { + records.add(new TestRecord(value.getPartition(), + value.getSequence(), + value.getPayload())); + } + } finally { + if (reader != null) { + try { + reader.close(); + } catch (Exception ex) { + System.out.println(ex.getMessage()); + } + } + } + return records; + } - private List<Group> readParquetFiles(File outputFile) + + protected List<TestRecord> readParquetFilesProto(File outputFile) + throws IOException { + ParquetReader<TestRecordProtos.TestRecordOrBuilder> reader = null; + List<TestRecord> records = new ArrayList<>(); + try { + reader = new ProtoParquetReader<>(new Path(outputFile.toString())); + TestRecordProtos.TestRecordOrBuilder value = reader.read(); + while (value!= null) { + records.add(new TestRecord(value.getPartition(), + value.getSequence(), + value.getPayload())); + value = reader.read(); + } + } finally { + if (reader != null) { + try { + reader.close(); + } catch (Exception ex) { + System.out.println(ex.getMessage()); + } + } + } + return records; + } + + protected List<TestRecord> readParquetFilesGroup(File outputFile) throws IOException { ParquetReader<Group> reader = null; List<Group> records = new ArrayList<>(); @@ -123,46 +147,42 @@ public class ParquetHdfsDataWriterTest { } } } - return records; + return records.stream().map(value -> new TestRecord( + value.getInteger(TestConstants.PARTITION_FIELD_NAME, 0), + value.getInteger(TestConstants.SEQUENCE_FIELD_NAME, 0), + value.getString(TestConstants.PAYLOAD_FIELD_NAME, 0) + )).collect(Collectors.toList()); } + @Test public void testWrite() throws Exception { - long firstWrite; - long secondWrite; - List<Group> records; - Group record1 = TestConstants.PARQUET_RECORD_1; - Group record2 = TestConstants.PARQUET_RECORD_2; - String filePath = TestConstants.TEST_OUTPUT_DIR + Path.SEPARATOR + this.filePath; - File outputFile = new File(filePath, TestConstants.PARQUET_TEST_FILENAME); - - this.writer.write(record1); - firstWrite = this.writer.recordsWritten(); - this.writer.write(record2); - secondWrite = this.writer.recordsWritten(); - this.writer.close(); - this.writer.commit(); - records = readParquetFiles(outputFile); - Group resultRecord1 = records.get(0); - Group resultRecord2 = records.get(1); - - Assert.assertEquals(firstWrite, 1); - Assert.assertEquals(secondWrite, 2); - Assert.assertEquals(resultRecord1.getString("name", 0), "tilak"); - Assert.assertEquals(resultRecord1.getInteger("age", 0), 22); - Assert.assertEquals(resultRecord2.getString("name", 0), "other"); - Assert.assertEquals(resultRecord2.getInteger("age", 0), 22); + super.testWrite(); + } + + @Override + protected Object getSchema(ParquetRecordFormat format) { + switch (format) { + case GROUP: { + return TestConstants.PARQUET_SCHEMA; + } + case PROTOBUF: { + return TestRecordProtos.TestRecord.class; + } + case AVRO: { + return org.apache.gobblin.test.avro.TestRecord.getClassSchema(); + } + default: + throw new RuntimeException(format.name() + " is not implemented"); + } } + @AfterClass public void tearDown() throws IOException { - // Clean up the staging and/or output directories if necessary - File testRootDir = new File(TestConstants.TEST_ROOT_DIR); - if (testRootDir.exists()) { - FileUtil.fullyDelete(testRootDir); - } + super.tearDown(); } class SimpleReadSupport extends ReadSupport<Group> { diff --git a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java index e5bf215..6eb58dc 100644 --- a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java +++ b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java @@ -23,40 +23,28 @@ import parquet.schema.OriginalType; import parquet.schema.PrimitiveType; import parquet.schema.Types; +import org.apache.gobblin.parquet.writer.test.TestConstantsBase; +import org.apache.gobblin.test.TestRecord; -public class TestConstants { - public static final MessageType PARQUET_SCHEMA = Types.buildMessage() - .addFields(Types.required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("name"), - Types.optional(PrimitiveType.PrimitiveTypeName.INT32).named("age")).named("User"); - - public static final Group PARQUET_RECORD_1 = new SimpleGroup(PARQUET_SCHEMA); - - public static final Group PARQUET_RECORD_2 = new SimpleGroup(PARQUET_SCHEMA); - - public static final String PARQUET_TEST_FILENAME = "test.parquet"; - - public static final String TEST_FS_URI = "file:///"; - - public static final String TEST_ROOT_DIR = System.getProperty("java.io.tmpdir") + "/" + System.currentTimeMillis(); - - public static final String TEST_STAGING_DIR = TEST_ROOT_DIR + "/staging"; - public static final String TEST_OUTPUT_DIR = TEST_ROOT_DIR + "/output"; +public class TestConstants extends TestConstantsBase<Group> { - public static final String TEST_WRITER_ID = "writer-1"; - - public static final String TEST_EXTRACT_NAMESPACE = "com.linkedin.writer.test"; - - public static final String TEST_EXTRACT_ID = String.valueOf(System.currentTimeMillis()); - - public static final String TEST_EXTRACT_TABLE = "TestTable"; - - public static final String TEST_EXTRACT_PULL_TYPE = "FULL"; - - static { - PARQUET_RECORD_1.add("name", "tilak"); - PARQUET_RECORD_1.add("age", 22); - PARQUET_RECORD_2.add("name", "other"); - PARQUET_RECORD_2.add("age", 22); + public static final MessageType PARQUET_SCHEMA = Types.buildMessage() + .addFields( + Types.required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8) + .named(TestConstants.PAYLOAD_FIELD_NAME), + Types.required(PrimitiveType.PrimitiveTypeName.INT32).named(TestConstants.PARTITION_FIELD_NAME), + // Sequence field is INT32 instead of INT64, because this version of parquet only supports INT32 + Types.required(PrimitiveType.PrimitiveTypeName.INT32).named(TestConstants.SEQUENCE_FIELD_NAME)) + .named("Data"); + + @Override + public Group convertToParquetGroup(TestRecord record) { + Group group = new SimpleGroup(PARQUET_SCHEMA); + group.add(PAYLOAD_FIELD_NAME, record.getPayload()); + group.add(SEQUENCE_FIELD_NAME, Long.valueOf(record.getSequence()).intValue()); + group.add(PARTITION_FIELD_NAME, record.getPartition()); + return group; } -} + +} \ No newline at end of file diff --git a/gobblin-test-utils/build.gradle b/gobblin-test-utils/build.gradle index 3c42427..2402ac7 100644 --- a/gobblin-test-utils/build.gradle +++ b/gobblin-test-utils/build.gradle @@ -14,6 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +plugins { + id "com.google.protobuf" version "0.8.8" + id "com.commercehub.gradle.plugin.avro-base" version "0.9.0" +} apply plugin: 'java' @@ -28,13 +32,72 @@ dependencies { compile externalDependency.lombok compile externalDependency.typesafeConfig compile externalDependency.findBugsAnnotations + compile externalDependency.protobuf + testCompile externalDependency.testng testCompile externalDependency.mockito } +sourceSets { + main { + java { + srcDir 'src/main/gen-proto' + srcDir 'src/main/gen-avro' + } + resources { + srcDir 'src/main/proto' + srcDir 'src/main/avro' + } + + } +} + +/** +clean.doFirst { + delete file("${projectDir}/src/main/gen-proto/").listFiles() + delete file("${projectDir}/src/main/gen-avro/").listFiles() +} +**/ + test { workingDir rootProject.rootDir } +protobuf { + generatedFilesBaseDir = "$projectDir/src/main/gen-proto" + protoc { + artifact = "com.google.protobuf:protoc:3.6.1" + } + plugins { + grpc { + artifact = "io.grpc:protoc-gen-grpc-java:1.19.0" + } + } + generateProtoTasks { + all()*.plugins { + grpc {} + } + } +} + +avro { + stringType = "string" +} + +task generateAvro(type: com.commercehub.gradle.plugin.avro.GenerateAvroJavaTask) { + source("src/main/avro") + outputDir = file("src/main/gen-avro") +} + +compileJava { + dependsOn tasks.generateAvro +} + +checkstyleMain.source = ['src/main/java','src/test/java'] + +findbugs { + excludeFilter = file("buildConfig/findbugs-exclude-filter.xml") +} + ext.classification="library" diff --git a/gobblin-test-utils/buildConfig/findbugs-exclude-filter.xml b/gobblin-test-utils/buildConfig/findbugs-exclude-filter.xml new file mode 100644 index 0000000..61b34c0 --- /dev/null +++ b/gobblin-test-utils/buildConfig/findbugs-exclude-filter.xml @@ -0,0 +1,6 @@ +<?xml version="1.0" encoding="UTF-8"?> +<FindBugsFilter> + <Match> + <Package name="~org\.apache\.gobblin\.test\.proto.*"/> + </Match> +</FindBugsFilter> diff --git a/gobblin-test-utils/src/main/avro/TestRecord.avsc b/gobblin-test-utils/src/main/avro/TestRecord.avsc new file mode 100644 index 0000000..0a5d520 --- /dev/null +++ b/gobblin-test-utils/src/main/avro/TestRecord.avsc @@ -0,0 +1,23 @@ +{ + "type": "record", + "name": "TestRecord", + "namespace" : "org.apache.gobblin.test.avro", + "fields": [ + { + "name": "partition", + "type": "int", + "doc": "Partition id for the record" + }, + { + "name": "sequence", + "type": "long", + "doc": "The sequence number" + }, + { + "name": "payload", + "type": "string", + "default" : "", + "doc": "Human-readable payload" + } + ] +} diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/test/TestRecord.java b/gobblin-test-utils/src/main/java/org/apache/gobblin/test/TestRecord.java similarity index 100% rename from gobblin-core-base/src/main/java/org/apache/gobblin/test/TestRecord.java rename to gobblin-test-utils/src/main/java/org/apache/gobblin/test/TestRecord.java diff --git a/gobblin-test-utils/src/main/proto/TestRecord.proto b/gobblin-test-utils/src/main/proto/TestRecord.proto new file mode 100644 index 0000000..53bafbd --- /dev/null +++ b/gobblin-test-utils/src/main/proto/TestRecord.proto @@ -0,0 +1,11 @@ +syntax = "proto2"; + +option java_package = "org.apache.gobblin.test.proto"; +option java_outer_classname = "TestRecordProtos"; + + +message TestRecord { + required int32 partition = 1; + required int64 sequence = 2; + required string payload = 3; +} diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle index f2836f7..7c83a79 100644 --- a/gradle/scripts/dependencyDefinitions.gradle +++ b/gradle/scripts/dependencyDefinitions.gradle @@ -130,7 +130,7 @@ ext.externalDependency = [ "curatorTest": "org.apache.curator:curator-test:2.10.0", "hamcrest": "org.hamcrest:hamcrest-all:1.3", "joptSimple": "net.sf.jopt-simple:jopt-simple:4.9", - "protobuf": "com.google.protobuf:protobuf-java:2.5.0", + "protobuf": "com.google.protobuf:protobuf-java:3.6.1", "pegasus" : [ "data" : "com.linkedin.pegasus:data:" + pegasusVersion, "generator" : "com.linkedin.pegasus:generator:" + pegasusVersion, @@ -169,7 +169,11 @@ ext.externalDependency = [ "hadoopAdl" : "org.apache.hadoop:hadoop-azure-datalake:3.0.0-alpha2", "orcMapreduce":"org.apache.orc:orc-mapreduce:1.6.1", 'parquet': 'org.apache.parquet:parquet-hadoop:1.10.1', + 'parquetAvro': 'org.apache.parquet:parquet-avro:1.10.1', + 'parquetProto': 'org.apache.parquet:parquet-protobuf:1.10.1', 'twitterParquet': 'com.twitter:parquet-hadoop-bundle:1.5.0', + 'twitterParquetAvro': 'com.twitter:parquet-avro:1.5.0', + 'twitterParquetProto': 'com.twitter:parquet-protobuf:1.5.0', 'reactivex': 'io.reactivex.rxjava2:rxjava:2.1.0', "slf4j": [ "org.slf4j:slf4j-api:" + slf4jVersion, @@ -182,6 +186,7 @@ ext.externalDependency = [ ], "postgresConnector": "org.postgresql:postgresql:42.1.4", "assertj": 'org.assertj:assertj-core:3.8.0', + "protobuf": 'com.google.protobuf:protobuf-java:3.6.1', ] if (!isDefaultEnvironment)