[
https://issues.apache.org/jira/browse/BEAM-214?focusedWorklogId=105510&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105510
]
ASF GitHub Bot logged work on BEAM-214:
---------------------------------------
Author: ASF GitHub Bot
Created on: 24/May/18 08:43
Start Date: 24/May/18 08:43
Worklog Time Spent: 10m
Work Description: jbonofre closed pull request #1851: [BEAM-214] Add
ParquetIO
URL: https://github.com/apache/beam/pull/1851
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/java/io/parquet/build.gradle
b/sdks/java/io/parquet/build.gradle
new file mode 100644
index 00000000000..5a2c7eca1bb
--- /dev/null
+++ b/sdks/java/io/parquet/build.gradle
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply from: project(":").file("build_rules.gradle")
+applyJavaNature()
+
+description = "Apache Beam :: SDKs :: Java :: IO :: Parquet"
+
+/*
+ * We need to rely on manually specifying these evaluationDependsOn to ensure
that
+ * the following projects are evaluated before we evaluate this project. This
is because
+ * we are attempting to reference the "sourceSets.test.output" directly.
+ * TODO: Swap to generating test artifacts which we can then rely on instead of
+ * the test outputs directly.
+ */
+evaluationDependsOn(":beam-sdks-parent:beam-sdks-java-parent:beam-sdks-java-io-parent:beam-sdks-java-io-common")
+
+dependencies {
+ compile library.java.guava
+ shadow project(path:
":beam-sdks-parent:beam-sdks-java-parent:beam-sdks-java-core", configuration:
"shadow")
+ shadow library.java.slf4j_api
+ shadow library.java.joda_time
+ shadow library.java.findbugs_jsr305
+ shadow "org.apache.parquet:parquet-hadoop:1.9.0"
+ shadow "org.apache.parquet:parquet-avro:1.9.0"
+ shadow "org.apache.avro:avro:1.8.1"
+ shadow "org.apache.hadoop:hadoop-common:2.7.2"
+ shadow "org.apache.hadoop:hadoop-client:2.7.2"
+ testCompile project(path: ":beam-runners-parent:beam-runners-direct-java",
configuration: "shadow")
+ testCompile library.java.junit
+ testCompile library.java.hamcrest_core
+ testCompile library.java.slf4j_jdk14
+}
+
+task packageTests(type: Jar) {
+ from sourceSets.test.output
+ classifier = "tests"
+}
+
+artifacts.archives packageTests
diff --git a/sdks/java/io/parquet/pom.xml b/sdks/java/io/parquet/pom.xml
new file mode 100644
index 00000000000..be370c3ae75
--- /dev/null
+++ b/sdks/java/io/parquet/pom.xml
@@ -0,0 +1,120 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-io-parent</artifactId>
+ <version>2.4.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>beam-sdks-java-io-parquet</artifactId>
+ <name>Apache Beam :: SDKs :: Java :: IO :: Parquet</name>
+ <description>IO to read and write on Parquet storage format.</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-hadoop</artifactId>
+ <version>1.9.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>1.8.1</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-avro</artifactId>
+ <version>1.9.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>2.7.2</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>2.7.2</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- compile dependencies -->
+ <dependency>
+ <groupId>com.google.auto.value</groupId>
+ <artifactId>auto-value</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- test dependencies -->
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git
a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
new file mode 100644
index 00000000000..ff09126003b
--- /dev/null
+++
b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.parquet;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.hadoop.crypto.key.kms.KMSClientProvider.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.joda.time.Duration;
+
+/**
+ * IO to read and write Parquet files.
+ *
+ * <h3>Reading Parquet files</h3>
+ *
+ * <p>{@link ParquetIO} source returns a {@link PCollection} for
+ * Parquet files. The elements in the {@link PCollection} are Avro {@link
GenericRecord}.
+ *
+ * <p>To configure the {@link Read}, you have to provide the file patterns
(from) of the
+ * Parquet files and the Avro schema.
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * pipeline.apply(ParquetIO.read().from("/foo/bar").withSchema(schema))
+ * ...
+ * }
+ * </pre>
+ *
+ * <p>As {@link Read} is based on {@link FileIO}, it supports any filesystem
(hdfs, ...).
+ *
+ * <h3>Writing Parquet files</h3>
+ *
+ * <p>{@link Write} allows you to write a {@link PCollection} of {@link
GenericRecord} into a
+ * Parquet file.
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * pipeline
+ * .apply(...) // PCollection<GenericRecord>
+ * .apply(ParquetIO.write().withPath("/foo/bar").withSchema(schema));
+ * }</pre>
+ */
+public class ParquetIO {
+
+ /**
+ * Reads {@link GenericRecord} from a Parquet file (or multiple Parquet
files matching
+ * the pattern).
+ */
+ public static Read read() {
+ return new
AutoValue_ParquetIO_Read.Builder().setHintMatchesManyFiles(false)
+
.setMatchConfiguration(FileIO.MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
+ .build();
+ }
+
+ /**
+ * Like {@link #read()}, but reads each filepattern in the input {@link
PCollection}.
+ */
+ public static ReadAll readAll() {
+ return new AutoValue_ParquetIO_ReadAll.Builder()
+ .setMatchConfiguration(FileIO.MatchConfiguration
+ .create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
+ .build();
+ }
+
+ /**
+ * Like {@link #read()}, but reads each file in a {@link PCollection}
+ * of {@link org.apache.beam.sdk.io.FileIO.ReadableFile},
+ * which allows more flexible usage.
+ */
+ public static ReadFiles readFiles() {
+ return new AutoValue_ParquetIO_ReadFiles.Builder().build();
+ }
+
+ /**
+ * Writes a {@link PCollection} to an Parquet file.
+ */
+ public static Write write() {
+ return new AutoValue_ParquetIO_Write.Builder().build();
+ }
+
+ /**
+ * Implementation of {@link #read()}.
+ */
+ @AutoValue
+ public abstract static class Read extends PTransform<PBegin,
PCollection<GenericRecord>> {
+
+ @Nullable abstract ValueProvider<String> filepattern();
+ abstract FileIO.MatchConfiguration matchConfiguration();
+ @Nullable abstract Schema schema();
+ abstract boolean hintMatchesManyFiles();
+
+ abstract Builder builder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setFilepattern(ValueProvider<String> filepattern);
+ abstract Builder setMatchConfiguration(FileIO.MatchConfiguration
matchConfiguration);
+ abstract Builder setSchema(Schema schema);
+ abstract Builder setHintMatchesManyFiles(boolean hintManyFiles);
+
+ abstract Read build();
+ }
+
+ /**
+ * Reads from the given filename or filepattern.
+ *
+ * <p>If it is known that the filepattern will match a very large number
of files (at least tens
+ * of thousands), use {@link #withHintMatchesManyFiles} for better
performance and scalability.
+ */
+ public Read from(ValueProvider<String> filepattern) {
+ return builder().setFilepattern(filepattern).build();
+ }
+
+ /** Like {@link #from(ValueProvider)}. */
+ public Read from(String filepattern) {
+ return from(ValueProvider.StaticValueProvider.of(filepattern));
+ }
+
+ /**
+ * Schema of the record in the Parquet file.
+ */
+ public Read withSchema(Schema schema) {
+ return builder().setSchema(schema).build();
+ }
+
+ /** Sets the {@link FileIO.MatchConfiguration}. */
+ public Read withMatchConfiguration(FileIO.MatchConfiguration
matchConfiguration) {
+ return builder().setMatchConfiguration(matchConfiguration).build();
+ }
+
+ /** Configures whether or not a filepattern matching no files is allowed.
*/
+ public Read withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+ return
withMatchConfiguration(matchConfiguration().withEmptyMatchTreatment(treatment));
+ }
+
+ /**
+ * Continuously watches for new files matching the filepattern, polling it
at the given
+ * interval, until the given termination condition is reached. The
returned {@link PCollection}
+ * is unbounded.
+ *
+ * <p>This works only in runners supporting {@link
Experimental.Kind#SPLITTABLE_DO_FN}.
+ */
+ @Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
+ public Read watchForNewFiles(
+ Duration pollInterval, Watch.Growth.TerminationCondition<String, ?>
terminationCondition) {
+ return withMatchConfiguration(
+ matchConfiguration().continuously(pollInterval,
terminationCondition));
+ }
+
+ /**
+ * Hints that the filepattern specified in {@link #from(String)} matches a
very large number of
+ * files.
+ *
+ * <p>This hint may cause a runner to execute the transform differently,
in a way that improves
+ * performance for this case, but it may worsen performance if the
filepattern matches only a
+ * small number of files (e.g., in a runner that supports dynamic work
rebalancing, it will
+ * happen less efficiently within individual files).
+ */
+ public Read withHintMatchesManyFiles() {
+ return builder().setHintMatchesManyFiles(true).build();
+ }
+
+ @Override
+ public PCollection<GenericRecord> expand(PBegin input) {
+ checkNotNull(filepattern(), "filepattern");
+ checkNotNull(schema(), "schema");
+
+ ReadAll readAll =
readAll().withMatchConfiguration(matchConfiguration()).withSchema(schema());
+ return input
+ .apply("Create filepattern", Create.ofProvider(filepattern(),
+ StringUtf8Coder.of()))
+ .apply("Via ReadAll", readAll);
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder
+ .addIfNotNull(
+ DisplayData.item("filePattern", filepattern()).withLabel("Input
File Pattern"))
+ .include("matchConfiguration", matchConfiguration());
+ }
+ }
+
+ /**
+ * Implementation of {@link #readAll()}.
+ */
+ @AutoValue
+ public abstract static class ReadAll extends PTransform<PCollection<String>,
+ PCollection<GenericRecord>> {
+
+ abstract FileIO.MatchConfiguration matchConfiguration();
+ @Nullable abstract Schema schema();
+
+ abstract Builder builder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setMatchConfiguration(FileIO.MatchConfiguration
matchConfiguration);
+ abstract Builder setSchema(Schema schema);
+
+ abstract ReadAll build();
+ }
+
+ /**
+ * Sets the {@link org.apache.beam.sdk.io.FileIO.MatchConfiguration}.
+ */
+ public ReadAll withMatchConfiguration(FileIO.MatchConfiguration
configuration) {
+ return builder().setMatchConfiguration(configuration).build();
+ }
+
+ /**
+ * Sets the {@link EmptyMatchTreatment}.
+ */
+ public ReadAll withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+ return
withMatchConfiguration(matchConfiguration().withEmptyMatchTreatment(treatment));
+ }
+
+ /**
+ * Sets the schema of the records.
+ */
+ public ReadAll withSchema(Schema schema) {
+ return builder().setSchema(schema).build();
+ }
+
+ @Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
+ public ReadAll watchForNewFiles(
+ Duration pollInterval, Watch.Growth.TerminationCondition<String, ?>
terminationCondition) {
+ return withMatchConfiguration(
+ matchConfiguration().continuously(pollInterval,
terminationCondition));
+ }
+
+ @Override
+ public PCollection<GenericRecord> expand(PCollection<String> input) {
+ checkNotNull(schema(), "schema");
+ return input
+ .apply(FileIO.matchAll().withConfiguration(matchConfiguration()))
+ .apply(FileIO.readMatches())
+ .apply(readFiles().withSchema(schema()));
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.include("matchConfiguration", matchConfiguration());
+ }
+
+ }
+
+ /**
+ * Implementation of {@link #readFiles()}.
+ */
+ @AutoValue
+ public abstract static class ReadFiles extends
PTransform<PCollection<FileIO.ReadableFile>,
+ PCollection<GenericRecord>> {
+
+ @Nullable abstract Schema schema();
+
+ abstract Builder builder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setSchema(Schema schema);
+ abstract ReadFiles build();
+ }
+ /**
+ * Define the Avro schema of the record to read from the Parquet file.
+ */
+ public ReadFiles withSchema(Schema schema) {
+ checkArgument(schema != null,
+ "schema can not be null");
+ return builder().setSchema(schema).build();
+ }
+
+ @Override
+ public PCollection<GenericRecord> expand(PCollection<FileIO.ReadableFile>
input) {
+ return input
+ .apply(ParDo.of(new ReadFn()))
+ .setCoder(AvroCoder.of(schema()));
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.add(DisplayData.item("schema", schema().toString()));
+ }
+
+ static class ReadFn extends DoFn<FileIO.ReadableFile, GenericRecord> {
+
+ @ProcessElement
+ public void processElement(ProcessContext processContext) throws
Exception {
+ Path path = new
Path(processContext.element().getMetadata().resourceId().toString());
+ try (ParquetReader<GenericRecord> reader =
+ AvroParquetReader.<GenericRecord>builder(path).build()) {
+ GenericRecord read;
+ while ((read = reader.read()) != null) {
+ processContext.output(read);
+ }
+ }
+ }
+
+ }
+
+ }
+
+ /**
+ * {@link PTransform} writing a {@link PCollection} of {@link GenericRecord}
into a Parquet file.
+ */
+ @AutoValue
+ public abstract static class Write extends
PTransform<PCollection<GenericRecord>, PDone> {
+
+ @Nullable abstract String path();
+ @Nullable abstract String schema();
+
+ abstract Builder builder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setPath(String path);
+ abstract Builder setSchema(String schema);
+ abstract Write build();
+ }
+
+ /**
+ * Define the location (path) of the Parquet file to write.
+ */
+ public Write withPath(String path) {
+ checkArgument(path != null, "ParquetIO.write().withPath(path) called
with null path");
+ return builder().setPath(path).build();
+ }
+
+ /**
+ * Define the Avro schema of the Avro {@link GenericRecord} to be written
in the Parquet file.
+ */
+ public Write withSchema(String schema) {
+ checkArgument(schema != null,
+ "ParquetIO.write().withSchema(schema) called with null schema");
+ return builder().setSchema(schema).build();
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.add(DisplayData.item("path", path()));
+ builder.add(DisplayData.item("schema", schema()));
+ }
+
+ @Override
+ public PDone expand(PCollection<GenericRecord> input) {
+ input.apply(ParDo.of(new WriteFn(this)));
+ return PDone.in(input.getPipeline());
+ }
+
+ static class WriteFn extends DoFn<GenericRecord, Void> {
+
+ private Write spec;
+ private transient ParquetWriter<GenericRecord> writer;
+
+ public WriteFn(Write spec) {
+ this.spec = spec;
+ }
+
+ @Setup
+ public void setup() throws Exception {
+ Path path = new Path(spec.path());
+ Schema schema = new Schema.Parser().parse(spec.schema());
+ writer =
AvroParquetWriter.<GenericRecord>builder(path).withSchema(schema).build();
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext processContext) throws
Exception {
+ GenericRecord record = processContext.element();
+ writer.write(record);
+ }
+
+ @Teardown
+ public void teardown() throws Exception {
+ writer.close();
+ }
+
+ }
+
+ }
+
+}
diff --git
a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/package-info.java
b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/package-info.java
new file mode 100644
index 00000000000..e22d3c580bf
--- /dev/null
+++
b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Transforms for reading and writing from Parquet.
+ */
+package org.apache.beam.sdk.io.parquet;
diff --git
a/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java
b/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java
new file mode 100644
index 00000000000..db606ca3a60
--- /dev/null
+++
b/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.parquet;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.Serializable;
+import java.util.ArrayList;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Test on the {@link ParquetIO}.
+ */
+public class ParquetIOTest implements Serializable {
+
+ @Rule
+ public transient TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Rule
+ public transient TestPipeline pipeline = TestPipeline.create();
+
+ @Test
+ public void testRead() throws Exception {
+ File file = temporaryFolder.newFile("testread.parquet");
+ file.delete();
+ Path path = new Path(file.toString());
+
+ Schema schema = new Schema.Parser().parse("{\"type\":\"record\",
\"name\":\"testrecord\","
+ + "\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]}");
+
+ try (ParquetWriter<GenericRecord> writer =
AvroParquetWriter.<GenericRecord>builder(path)
+ .withSchema(schema)
+ .build()) {
+
+ String[] scientists = {"Einstein", "Darwin", "Copernicus", "Pasteur",
"Curie", "Faraday",
+ "Newton", "Bohr", "Galilei", "Maxwell"};
+
+ GenericRecordBuilder builder = new GenericRecordBuilder(schema);
+
+ for (int i = 0; i < 1000; i++) {
+ int index = i % scientists.length;
+ GenericRecord record = builder.set("name", scientists[index]).build();
+ writer.write(record);
+ }
+ }
+
+ PCollection<GenericRecord> output =
pipeline.apply(ParquetIO.read().from(file.toString())
+ .withSchema(schema));
+
+ PAssert.thatSingleton(output.apply("Count All",
Count.<GenericRecord>globally()))
+ .isEqualTo(1000L);
+
+ PCollection<KV<String, GenericRecord>> mapped =
+ output.apply(MapElements.via(
+ new SimpleFunction<GenericRecord, KV<String, GenericRecord>>() {
+ public KV<String, GenericRecord> apply(GenericRecord record) {
+ String name = record.get("name").toString();
+ return KV.of(name, record);
+ }
+ }));
+
+ PAssert.that(mapped.apply("Count Scientist", Count.<String,
GenericRecord>perKey()))
+ .satisfies(new SerializableFunction<Iterable<KV<String, Long>>,
Void>() {
+ @Override
+ public Void apply(Iterable<KV<String, Long>> input) {
+ for (KV<String, Long> element : input) {
+ assertEquals(element.getKey(), 100L, element.getValue().longValue());
+ }
+ return null;
+ }
+ });
+
+ pipeline.run();
+ }
+
+ @Ignore
+ @Test
+ public void testWrite() throws Exception {
+ File file = temporaryFolder.newFile("testwrite.parquet");
+ file.delete();
+
+ Schema schema = new Schema.Parser().parse("{\"type\":\"record\",
\"name\":\"testrecord\","
+ + "\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]}");
+
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ String[] scientists = {"Einstein", "Darwin", "Copernicus", "Pasteur",
"Curie", "Faraday",
+ "Newton", "Bohr", "Galilei", "Maxwell"};
+
+ GenericRecordBuilder builder = new GenericRecordBuilder(schema);
+
+ for (int i = 0; i < 1000; i++) {
+ int index = i % scientists.length;
+ GenericRecord record = builder.set("name", scientists[index]).build();
+ data.add(record);
+ }
+
+ pipeline.apply(Create.of(data).withCoder(AvroCoder.of(schema)))
+
.apply(ParquetIO.write().withPath(file.toString()).withSchema(schema.toString()));
+
+ pipeline.run();
+
+ Path path = new Path(file.toString());
+
+ int count = 0;
+ ParquetReader<GenericRecord> reader =
AvroParquetReader.<GenericRecord>builder(path).build();
+ GenericRecord record;
+ while ((record = reader.read()) != null) {
+ System.out.println(record.get("name"));
+ count++;
+ }
+
+ assertEquals(1000, count);
+ }
+
+}
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index e57abe0b9e1..4be92c4332f 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -61,6 +61,7 @@
<module>kinesis</module>
<module>mongodb</module>
<module>mqtt</module>
+ <module>parquet</module>
<module>redis</module>
<module>solr</module>
<module>tika</module>
diff --git a/settings.gradle b/settings.gradle
index 7226273053e..b532bca8947 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -69,6 +69,7 @@ include ":sdks:java:io:kafka"
include ":sdks:java:io:kinesis"
include ":sdks:java:io:mongodb"
include ":sdks:java:io:mqtt"
+include ":sdks:java:io:parquet"
include ":sdks:java:io:redis"
include ":sdks:java:io:solr"
include ":sdks:java:io:tika"
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 105510)
Time Spent: 15h 40m (was: 15.5h)
> Create Parquet IO
> -----------------
>
> Key: BEAM-214
> URL: https://issues.apache.org/jira/browse/BEAM-214
> Project: Beam
> Issue Type: Improvement
> Components: io-ideas
> Reporter: Neville Li
> Assignee: Jean-Baptiste Onofré
> Priority: Minor
> Time Spent: 15h 40m
> Remaining Estimate: 0h
>
> Would be nice to support Parquet files with projection and predicates.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)