Repository: incubator-gobblin Updated Branches: refs/heads/master 8af87cb78 -> 312e768f5
[GOBBLIN-255] ParquetHdfsDataWriter Closes #2106 from tilakpatidar/parquet_writer Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/312e768f Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/312e768f Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/312e768f Branch: refs/heads/master Commit: 312e768f564e7cb4619c7986cfdf9b0f828bbc7b Parents: 8af87cb Author: tilakpatidar <[email protected]> Authored: Thu Oct 5 12:11:23 2017 -0700 Committer: Abhishek Tiwari <[email protected]> Committed: Thu Oct 5 12:11:23 2017 -0700 ---------------------------------------------------------------------- .../apache/gobblin/writer/TestConstants.java | 14 +- .../Configuration-Properties-Glossary.md | 35 ++++ gobblin-modules/gobblin-parquet/build.gradle | 43 +++++ .../writer/ParquetDataWriterBuilder.java | 112 ++++++++++++ .../gobblin/writer/ParquetHdfsDataWriter.java | 70 ++++++++ .../writer/ParquetHdfsDataWriterTest.java | 178 +++++++++++++++++++ .../apache/gobblin/writer/TestConstants.java | 62 +++++++ 7 files changed, 505 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/312e768f/gobblin-core/src/test/java/org/apache/gobblin/writer/TestConstants.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/java/org/apache/gobblin/writer/TestConstants.java b/gobblin-core/src/test/java/org/apache/gobblin/writer/TestConstants.java index 8af13b8..ce0e9ab 100644 --- a/gobblin-core/src/test/java/org/apache/gobblin/writer/TestConstants.java +++ b/gobblin-core/src/test/java/org/apache/gobblin/writer/TestConstants.java @@ -25,15 +25,11 @@ package org.apache.gobblin.writer; public class TestConstants { // Test Avro schema - public static final String AVRO_SCHEMA = "{\"namespace\": \"example.avro\",\n" + - " \"type\": \"record\",\n" + - " \"name\": \"User\",\n" + - " \"fields\": [\n" + - " {\"name\": \"name\", \"type\": \"string\"},\n" + - " {\"name\": \"favorite_number\", \"type\": \"int\"},\n" + - " {\"name\": \"favorite_color\", \"type\": \"string\"}\n" + - " ]\n" + - "}"; + public static final String AVRO_SCHEMA = + "{\"namespace\": \"example.avro\",\n" + " \"type\": \"record\",\n" + " \"name\": \"User\",\n" + " \"fields\": [\n" + + " {\"name\": \"name\", \"type\": \"string\"},\n" + + " {\"name\": \"favorite_number\", \"type\": \"int\"},\n" + + " {\"name\": \"favorite_color\", \"type\": \"string\"}\n" + " ]\n" + "}"; // Test Avro data in json format public static final String[] JSON_RECORDS = http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/312e768f/gobblin-docs/user-guide/Configuration-Properties-Glossary.md ---------------------------------------------------------------------- diff --git a/gobblin-docs/user-guide/Configuration-Properties-Glossary.md b/gobblin-docs/user-guide/Configuration-Properties-Glossary.md index e275ca2..363d873 100644 --- a/gobblin-docs/user-guide/Configuration-Properties-Glossary.md +++ b/gobblin-docs/user-guide/Configuration-Properties-Glossary.md @@ -1059,6 +1059,41 @@ This is used to control the writer creation. If the value is set to true, writer False ###### Required No +#### writer.parquet.page.size +###### Description +The page size threshold +###### Default Value +1048576 +###### Required +No +#### writer.parquet.dictionary.page.size +###### Description +The block size threshold. +###### Default Value +134217728 +###### Required +No +#### writer.parquet.dictionary +###### Description +To turn dictionary encoding on. +###### Default Value +true +###### Required +No +#### writer.parquet.validate +###### Description +To turn on validation using the schema. +###### Default Value +false +###### Required +No +#### writer.parquet.version +###### Description +Version of parquet writer to use. Available versions are v1 and v2. +###### Default Value +v1 +###### Required +No # Data Publisher Properties <a name="Data-Publisher-Properties"></a> #### data.publisher.type ###### Description http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/312e768f/gobblin-modules/gobblin-parquet/build.gradle ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-parquet/build.gradle b/gobblin-modules/gobblin-parquet/build.gradle new file mode 100644 index 0000000..e43f543 --- /dev/null +++ b/gobblin-modules/gobblin-parquet/build.gradle @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +apply plugin: 'java' + +dependencies { + compile project(":gobblin-core") + + compile externalDependency.parquet + + testCompile externalDependency.testng + testCompile externalDependency.mockito + testCompile externalDependency.mockRunnerJdbc +} + +configurations { + compile { transitive = false } + // Remove xerces dependencies because of versioning issues. Standard JRE implementation should + // work. See also http://stackoverflow.com/questions/11677572/dealing-with-xerces-hell-in-java-maven + // HADOOP-5254 and MAPREDUCE-5664 + all*.exclude group: 'xml-apis' + all*.exclude group: 'xerces' +} + +test { + workingDir rootProject.rootDir +} + +ext.classification="library" http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/312e768f/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java new file mode 100644 index 0000000..7ce2020 --- /dev/null +++ b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.writer; + +import java.io.IOException; +import java.util.Optional; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.util.ForkOperatorUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; + +import parquet.column.ParquetProperties; +import parquet.example.data.Group; +import parquet.hadoop.ParquetWriter; +import parquet.hadoop.example.GroupWriteSupport; +import parquet.hadoop.metadata.CompressionCodecName; +import parquet.schema.MessageType; + +import static org.apache.gobblin.configuration.ConfigurationKeys.LOCAL_FS_URI; +import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_CODEC_TYPE; +import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_FILE_SYSTEM_URI; +import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_PREFIX; +import static parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE; +import static parquet.hadoop.ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED; +import static parquet.hadoop.ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED; +import static parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE; + + +public class ParquetDataWriterBuilder extends FsDataWriterBuilder<MessageType, Group> { + public static final String WRITER_PARQUET_PAGE_SIZE = WRITER_PREFIX + ".parquet.pageSize"; + public static final String WRITER_PARQUET_DICTIONARY_PAGE_SIZE = WRITER_PREFIX + ".parquet.dictionaryPageSize"; + public static final String WRITER_PARQUET_DICTIONARY = WRITER_PREFIX + ".parquet.dictionary"; + public static final String WRITER_PARQUET_VALIDATE = WRITER_PREFIX + ".parquet.validate"; + public static final String WRITER_PARQUET_VERSION = WRITER_PREFIX + ".parquet.version"; + public static final String DEFAULT_PARQUET_WRITER = "v1"; + + @Override + public DataWriter<Group> build() + throws IOException { + Preconditions.checkNotNull(this.destination); + Preconditions.checkArgument(!Strings.isNullOrEmpty(this.writerId)); + Preconditions.checkNotNull(this.schema); + Preconditions.checkArgument(this.format == WriterOutputFormat.PARQUET); + + switch (this.destination.getType()) { + case HDFS: + return new ParquetHdfsDataWriter(this, this.destination.getProperties()); + default: + throw new RuntimeException("Unknown destination type: " + this.destination.getType()); + } + } + + /** + * Build a {@link ParquetWriter<Group>} for given file path with a block size. + * @param blockSize + * @param stagingFile + * @return + * @throws IOException + */ + public ParquetWriter<Group> getWriter(int blockSize, Path stagingFile) + throws IOException { + State state = this.destination.getProperties(); + int pageSize = state.getPropAsInt(getProperty(WRITER_PARQUET_PAGE_SIZE), DEFAULT_PAGE_SIZE); + int dictPageSize = state.getPropAsInt(getProperty(WRITER_PARQUET_DICTIONARY_PAGE_SIZE), DEFAULT_BLOCK_SIZE); + boolean enableDictionary = + state.getPropAsBoolean(getProperty(WRITER_PARQUET_DICTIONARY), DEFAULT_IS_DICTIONARY_ENABLED); + boolean validate = state.getPropAsBoolean(getProperty(WRITER_PARQUET_VALIDATE), DEFAULT_IS_VALIDATING_ENABLED); + String rootURI = state.getProp(WRITER_FILE_SYSTEM_URI, LOCAL_FS_URI); + Path absoluteStagingFile = new Path(rootURI, stagingFile); + CompressionCodecName codec = getCodecFromConfig(); + GroupWriteSupport support = new GroupWriteSupport(); + Configuration conf = new Configuration(); + GroupWriteSupport.setSchema(this.schema, conf); + ParquetProperties.WriterVersion writerVersion = getWriterVersion(); + return new ParquetWriter<>(absoluteStagingFile, support, codec, blockSize, pageSize, dictPageSize, enableDictionary, + validate, writerVersion, conf); + } + + private ParquetProperties.WriterVersion getWriterVersion() { + return ParquetProperties.WriterVersion.fromString( + this.destination.getProperties().getProp(getProperty(WRITER_PARQUET_VERSION), DEFAULT_PARQUET_WRITER)); + } + + private CompressionCodecName getCodecFromConfig() { + State state = this.destination.getProperties(); + String codecValue = Optional.ofNullable(state.getProp(getProperty(WRITER_CODEC_TYPE))) + .orElse(CompressionCodecName.SNAPPY.toString()); + return CompressionCodecName.valueOf(codecValue.toUpperCase()); + } + + private String getProperty(String key) { + return ForkOperatorUtils.getPropertyNameForBranch(key, this.getBranches(), this.getBranch()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/312e768f/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java new file mode 100644 index 0000000..a775bc2 --- /dev/null +++ b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.writer; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; + +import parquet.example.data.Group; +import parquet.hadoop.ParquetWriter; + + +/** + * An extension to {@link FsDataWriter} that writes in Parquet format in the form of {@link Group}s. + * + * <p> + * This implementation allows users to specify the {@link parquet.hadoop.CodecFactory} to use through the configuration + * property {@link ConfigurationKeys#WRITER_CODEC_TYPE}. By default, the deflate codec is used. + * </p> + * + * @author tilakpatidar + */ +public class ParquetHdfsDataWriter extends FsDataWriter<Group> { + private final ParquetWriter<Group> writer; + protected final AtomicLong count = new AtomicLong(0); + + public ParquetHdfsDataWriter(ParquetDataWriterBuilder builder, State state) + throws IOException { + super(builder, state); + this.writer = builder.getWriter((int) this.blockSize, this.stagingFile); + } + + @Override + public void write(Group record) + throws IOException { + this.writer.write(record); + this.count.incrementAndGet(); + } + + @Override + public long recordsWritten() { + return this.count.get(); + } + + @Override + public void close() + throws IOException { + try { + this.writer.close(); + } finally { + super.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/312e768f/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java new file mode 100644 index 0000000..40c638c --- /dev/null +++ b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.writer; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import parquet.example.data.Group; +import parquet.example.data.simple.convert.GroupRecordConverter; +import parquet.hadoop.ParquetReader; +import parquet.hadoop.api.InitContext; +import parquet.hadoop.api.ReadSupport; +import parquet.io.api.RecordMaterializer; +import parquet.schema.MessageType; + +import static org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_DICTIONARY; +import static org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_DICTIONARY_PAGE_SIZE; +import static org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_PAGE_SIZE; +import static org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_VALIDATE; + + +@Test(groups = {"gobblin.writer"}) +public class ParquetHdfsDataWriterTest { + + private MessageType schema; + private String filePath; + private ParquetHdfsDataWriter writer; + private State properties; + + @BeforeMethod + public void setUp() + throws Exception { + // Making the staging and/or output dirs if necessary + File stagingDir = new File(TestConstants.TEST_STAGING_DIR); + File outputDir = new File(TestConstants.TEST_OUTPUT_DIR); + if (!stagingDir.exists()) { + boolean mkdirs = stagingDir.mkdirs(); + assert mkdirs; + } + if (!outputDir.exists()) { + boolean mkdirs = outputDir.mkdirs(); + assert mkdirs; + } + this.schema = TestConstants.PARQUET_SCHEMA; + this.filePath = getFilePath(); + this.properties = createStateWithConfig(); + this.writer = (ParquetHdfsDataWriter) getParquetDataWriterBuilder().build(); + } + + private String getFilePath() { + return TestConstants.TEST_EXTRACT_NAMESPACE.replaceAll("\\.", "/") + "/" + TestConstants.TEST_EXTRACT_TABLE + "/" + + TestConstants.TEST_EXTRACT_ID + "_" + TestConstants.TEST_EXTRACT_PULL_TYPE; + } + + private State createStateWithConfig() { + State properties = new State(); + properties.setProp(ConfigurationKeys.WRITER_BUFFER_SIZE, ConfigurationKeys.DEFAULT_BUFFER_SIZE); + properties.setProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, TestConstants.TEST_FS_URI); + properties.setProp(ConfigurationKeys.WRITER_STAGING_DIR, TestConstants.TEST_STAGING_DIR); + properties.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, TestConstants.TEST_OUTPUT_DIR); + properties.setProp(ConfigurationKeys.WRITER_FILE_PATH, this.filePath); + properties.setProp(ConfigurationKeys.WRITER_FILE_NAME, TestConstants.PARQUET_TEST_FILENAME); + properties.setProp(WRITER_PARQUET_DICTIONARY, true); + properties.setProp(WRITER_PARQUET_DICTIONARY_PAGE_SIZE, 1024); + properties.setProp(WRITER_PARQUET_PAGE_SIZE, 1024); + properties.setProp(WRITER_PARQUET_VALIDATE, true); + return properties; + } + + private ParquetDataWriterBuilder getParquetDataWriterBuilder() { + ParquetDataWriterBuilder writerBuilder = new ParquetDataWriterBuilder(); + writerBuilder.destination = Destination.of(Destination.DestinationType.HDFS, properties); + writerBuilder.writerId = TestConstants.TEST_WRITER_ID; + writerBuilder.schema = this.schema; + writerBuilder.format = WriterOutputFormat.PARQUET; + return writerBuilder; + } + + private List<Group> readParquetFiles(File outputFile) + throws IOException { + ParquetReader<Group> reader = null; + List<Group> records = new ArrayList<>(); + try { + reader = new ParquetReader<>(new Path(outputFile.toString()), new SimpleReadSupport()); + for (Group value = reader.read(); value != null; value = reader.read()) { + records.add(value); + } + } finally { + if (reader != null) { + try { + reader.close(); + } catch (Exception ex) { + System.out.println(ex.getMessage()); + } + } + } + return records; + } + + @Test + public void testWrite() + throws Exception { + long firstWrite; + long secondWrite; + List<Group> records; + Group record1 = TestConstants.PARQUET_RECORD_1; + Group record2 = TestConstants.PARQUET_RECORD_2; + String filePath = TestConstants.TEST_OUTPUT_DIR + Path.SEPARATOR + this.filePath; + File outputFile = new File(filePath, TestConstants.PARQUET_TEST_FILENAME); + + this.writer.write(record1); + firstWrite = this.writer.recordsWritten(); + this.writer.write(record2); + secondWrite = this.writer.recordsWritten(); + this.writer.close(); + this.writer.commit(); + records = readParquetFiles(outputFile); + Group resultRecord1 = records.get(0); + Group resultRecord2 = records.get(1); + + Assert.assertEquals(firstWrite, 1); + Assert.assertEquals(secondWrite, 2); + Assert.assertEquals(resultRecord1.getString("name", 0), "tilak"); + Assert.assertEquals(resultRecord1.getInteger("age", 0), 22); + Assert.assertEquals(resultRecord2.getString("name", 0), "other"); + Assert.assertEquals(resultRecord2.getInteger("age", 0), 22); + } + + @AfterClass + public void tearDown() + throws IOException { + // Clean up the staging and/or output directories if necessary + File testRootDir = new File(TestConstants.TEST_ROOT_DIR); + if (testRootDir.exists()) { + FileUtil.fullyDelete(testRootDir); + } + } + + class SimpleReadSupport extends ReadSupport<Group> { + @Override + public RecordMaterializer<Group> prepareForRead(Configuration conf, Map<String, String> metaData, + MessageType schema, ReadContext context) { + return new GroupRecordConverter(schema); + } + + @Override + public ReadContext init(InitContext context) { + return new ReadContext(context.getFileSchema()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/312e768f/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java new file mode 100644 index 0000000..6144aaf --- /dev/null +++ b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.writer; + +import parquet.example.data.Group; +import parquet.example.data.simple.SimpleGroup; +import parquet.schema.MessageType; +import parquet.schema.OriginalType; +import parquet.schema.PrimitiveType; +import parquet.schema.Types; + + +public class TestConstants { + public static final MessageType PARQUET_SCHEMA = Types.buildMessage() + .addFields(Types.required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("name"), + Types.optional(PrimitiveType.PrimitiveTypeName.INT32).named("age")).named("User"); + + public static final Group PARQUET_RECORD_1 = new SimpleGroup(PARQUET_SCHEMA); + + public static final Group PARQUET_RECORD_2 = new SimpleGroup(PARQUET_SCHEMA); + + public static final String PARQUET_TEST_FILENAME = "test.parquet"; + + public static final String TEST_FS_URI = "file:///"; + + public static final String TEST_ROOT_DIR = System.getProperty("java.io.tmpdir"); + + public static final String TEST_STAGING_DIR = TEST_ROOT_DIR + "/staging"; + + public static final String TEST_OUTPUT_DIR = TEST_ROOT_DIR + "/output"; + + public static final String TEST_WRITER_ID = "writer-1"; + + public static final String TEST_EXTRACT_NAMESPACE = "com.linkedin.writer.test"; + + public static final String TEST_EXTRACT_ID = String.valueOf(System.currentTimeMillis()); + + public static final String TEST_EXTRACT_TABLE = "TestTable"; + + public static final String TEST_EXTRACT_PULL_TYPE = "FULL"; + + static { + PARQUET_RECORD_1.add("name", "tilak"); + PARQUET_RECORD_1.add("age", 22); + PARQUET_RECORD_2.add("name", "other"); + PARQUET_RECORD_2.add("age", 22); + } +}
