[ 
https://issues.apache.org/jira/browse/BEAM-214?focusedWorklogId=105158&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105158
 ]

ASF GitHub Bot logged work on BEAM-214:
---------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/May/18 15:37
            Start Date: 23/May/18 15:37
    Worklog Time Spent: 10m 
      Work Description: jkff closed pull request #5242: [BEAM-214] ParquetIO
URL: https://github.com/apache/beam/pull/5242
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.test-infra/jenkins/job_PerformanceTests_FileBasedIO_IT.groovy 
b/.test-infra/jenkins/job_PerformanceTests_FileBasedIO_IT.groovy
index dc382c1f9bd..7c4ebf63b07 100644
--- a/.test-infra/jenkins/job_PerformanceTests_FileBasedIO_IT.groovy
+++ b/.test-infra/jenkins/job_PerformanceTests_FileBasedIO_IT.groovy
@@ -76,6 +76,18 @@ def testsConfigurations = [
                         numberOfRecords: '100000000',
                         charset: 'UTF-8'
                 ]
+        ],
+        [
+                jobName           : 'beam_PerformanceTests_ParquetIOIT',
+                jobDescription    : 'Runs PerfKit tests for 
beam_PerformanceTests_ParquetIOIT',
+                itClass           : 
'org.apache.beam.sdk.io.parquet.ParquetIOIT',
+                bqTable           : 'beam_performance.parquetioit_pkb_results',
+                prCommitStatusName: 'Java ParquetIOPerformance Test',
+                prTriggerPhase    : 'Run Java ParquetIO Performance Test',
+                extraPipelineArgs: [
+                        numberOfRecords: '100000000',
+                        charset: 'UTF-8'
+                ]
         ]
 ]
 
diff --git a/sdks/java/io/file-based-io-tests/build.gradle 
b/sdks/java/io/file-based-io-tests/build.gradle
index f84efa5ec20..98e85520a6a 100644
--- a/sdks/java/io/file-based-io-tests/build.gradle
+++ b/sdks/java/io/file-based-io-tests/build.gradle
@@ -29,6 +29,7 @@ dependencies {
   shadowTest project(":beam-sdks-java-io-common")
   shadowTest project(path: ":beam-sdks-java-io-common", configuration: 
"shadowTest")
   shadowTest project(":beam-sdks-java-io-xml")
+  shadowTest project(":beam-sdks-java-io-parquet")
   shadowTest library.java.guava
   shadowTest library.java.junit
   shadowTest library.java.hamcrest_core
diff --git a/sdks/java/io/file-based-io-tests/pom.xml 
b/sdks/java/io/file-based-io-tests/pom.xml
index 3de4ba55ae1..e87494efa7a 100644
--- a/sdks/java/io/file-based-io-tests/pom.xml
+++ b/sdks/java/io/file-based-io-tests/pom.xml
@@ -359,5 +359,10 @@
             <artifactId>beam-sdks-java-io-xml</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.beam</groupId>
+            <artifactId>beam-sdks-java-io-parquet</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
 
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
index 5b7ff38ee34..13694c9b996 100644
--- 
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
+++ 
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
@@ -59,6 +59,7 @@ public static String appendTimestampSuffix(String text) {
 
   public static String getExpectedHashForLineCount(int lineCount) {
     Map<Integer, String> expectedHashes = ImmutableMap.of(
+        1000, "8604c70b43405ef9803cb49b77235ea2",
         100_000, "4c8bb3b99dcc59459b20fefba400d446",
         1_000_000, "9796db06e7a7960f974d5a91164afff1",
         100_000_000, "6ce05f456e2fdc846ded2abd0ec1de95"
diff --git 
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOIT.java
 
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOIT.java
new file mode 100644
index 00000000000..bc9b67c4bde
--- /dev/null
+++ 
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOIT.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.parquet;
+
+import static 
org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampSuffix;
+import static 
org.apache.beam.sdk.io.common.FileBasedIOITHelper.getExpectedHashForLineCount;
+import static 
org.apache.beam.sdk.io.common.FileBasedIOITHelper.readTestPipelineOptions;
+import static org.apache.beam.sdk.values.TypeDescriptors.strings;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.common.FileBasedIOITHelper;
+import org.apache.beam.sdk.io.common.HashingFn;
+import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Integration tests for {@link org.apache.beam.sdk.io.parquet.ParquetIO}.
+ *
+ * <p>Run this test using the command below. Pass in connection information 
via PipelineOptions:
+ * <pre>
+ *  ./gradlew integrationTest -p sdks/java/io/file-based-io-tests
+ *  -DintegrationTestPipelineOptions='[
+ *  "--numberOfRecords=100000",
+ *  "--filenamePrefix=output_file_path",
+ *  ]'
+ *  --tests org.apache.beam.sdk.io.parquet.ParquetIOIT
+ *  -DintegrationTestRunner=direct
+ * </pre>
+ * </p>
+ *
+ * <p>Please see 'build_rules.gradle' file for instructions regarding
+ * running this test using Beam performance testing framework.</p>
+ */
+@RunWith(JUnit4.class)
+public class ParquetIOIT {
+
+  private static final Schema SCHEMA = new Schema.Parser().parse("{\n"
+    + " \"namespace\": \"ioitavro\",\n"
+    + " \"type\": \"record\",\n"
+    + " \"name\": \"TestAvroLine\",\n"
+    + " \"fields\": [\n"
+    + "     {\"name\": \"row\", \"type\": \"string\"}\n"
+    + " ]\n"
+    + "}");
+
+  private static String filenamePrefix;
+  private static Integer numberOfRecords;
+
+  @Rule
+  public TestPipeline pipeline = TestPipeline.create();
+
+  @BeforeClass
+  public static void setup() {
+    IOTestPipelineOptions options = readTestPipelineOptions();
+
+    numberOfRecords = options.getNumberOfRecords();
+    filenamePrefix = appendTimestampSuffix(options.getFilenamePrefix());
+  }
+
+  @Test
+  public void writeThenReadAll() {
+    PCollection<String> testFiles =
+      pipeline.apply("Generate sequence", 
GenerateSequence.from(0).to(numberOfRecords))
+      .apply("Produce text lines",
+        ParDo.of(new 
FileBasedIOITHelper.DeterministicallyConstructTestTextLineFn()))
+      .apply("Produce Avro records", ParDo.of(new 
DeterministicallyConstructAvroRecordsFn()))
+        .setCoder(AvroCoder.of(SCHEMA))
+      .apply("Write Parquet files",
+        
FileIO.<GenericRecord>write().via(ParquetIO.sink(SCHEMA)).to(filenamePrefix))
+      .getPerDestinationOutputFilenames()
+      .apply("Get file names", Values.create());
+
+    PCollection<String> consolidatedHashcode = testFiles.apply("Find files", 
FileIO.matchAll())
+      .apply("Read matched files", FileIO.readMatches())
+      .apply("Read parquet files", ParquetIO.readFiles(SCHEMA))
+      .apply("Map records to strings", MapElements.into(strings())
+        .via((SerializableFunction<GenericRecord, String>) record -> String
+          .valueOf(record.get("row"))))
+      .apply("Calculate hashcode", Combine.globally(new HashingFn()));
+
+    String expectedHash = getExpectedHashForLineCount(numberOfRecords);
+    PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash);
+
+    testFiles.apply("Delete test files", ParDo.of(new 
FileBasedIOITHelper.DeleteFileFn())
+      .withSideInputs(consolidatedHashcode.apply(View.asSingleton())));
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  private static class DeterministicallyConstructAvroRecordsFn extends 
DoFn<String, GenericRecord> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(
+        new GenericRecordBuilder(SCHEMA).set("row", c.element()).build()
+      );
+    }
+  }
+}
diff --git a/sdks/java/io/parquet/build.gradle 
b/sdks/java/io/parquet/build.gradle
new file mode 100644
index 00000000000..6e274a838f1
--- /dev/null
+++ b/sdks/java/io/parquet/build.gradle
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply from: project(":").file("build_rules.gradle")
+applyJavaNature()
+
+description = "Apache Beam :: SDKs :: Java :: IO :: Parquet"
+ext.summary = "IO to read and write on Parquet storage format."
+
+dependencies {
+  compile library.java.guava
+  shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
+  shadow library.java.slf4j_api
+  shadow library.java.joda_time
+  shadow library.java.findbugs_jsr305
+  shadow "org.apache.parquet:parquet-hadoop:1.10.0"
+  shadow "org.apache.parquet:parquet-avro:1.10.0"
+  shadow "org.apache.avro:avro:1.8.1"
+  shadow "org.apache.hadoop:hadoop-common:2.7.2"
+  shadow "org.apache.hadoop:hadoop-client:2.7.2"
+  testCompile project(path: ":beam-sdks-java-core", configuration: 
"shadowTest")
+  testCompile project(path: ":beam-runners-direct-java", configuration: 
"shadow")
+  testCompile library.java.junit
+  testCompile library.java.hamcrest_core
+  testCompile library.java.slf4j_jdk14
+}
diff --git a/sdks/java/io/parquet/pom.xml b/sdks/java/io/parquet/pom.xml
new file mode 100644
index 00000000000..87c1ee1c4d5
--- /dev/null
+++ b/sdks/java/io/parquet/pom.xml
@@ -0,0 +1,126 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-sdks-java-io-parent</artifactId>
+    <version>2.4.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-sdks-java-io-parquet</artifactId>
+  <name>Apache Beam :: SDKs :: Java :: IO :: Parquet</name>
+  <description>IO to read and write on Parquet storage format.</description>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.parquet</groupId>
+      <artifactId>parquet-hadoop</artifactId>
+      <version>1.10.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+      <version>1.8.1</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.parquet</groupId>
+      <artifactId>parquet-avro</artifactId>
+      <version>1.10.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>2.7.2</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <version>2.7.2</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <!-- compile dependencies -->
+    <dependency>
+      <groupId>com.google.auto.value</groupId>
+      <artifactId>auto-value</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <!-- test dependencies -->
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jdk14</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>
diff --git 
a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
 
b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
new file mode 100644
index 00000000000..e18334859a9
--- /dev/null
+++ 
b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.parquet;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
+
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import javax.annotation.Nullable;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.io.DelegatingSeekableInputStream;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.io.PositionOutputStream;
+import org.apache.parquet.io.SeekableInputStream;
+
+/**
+ * IO to read and write Parquet files.
+ *
+ * <h3>Reading Parquet files</h3>
+ *
+ * <p>{@link ParquetIO} source returns a {@link PCollection} for
+ * Parquet files. The elements in the {@link PCollection} are Avro {@link 
GenericRecord}.
+ *
+ * <p>To configure the {@link Read}, you have to provide the file patterns 
(from) of the Parquet
+ * files and the schema.
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ *  PCollection<GenericRecord> records = 
pipeline.apply(ParquetIO.read(SCHEMA).from("/foo/bar"));
+ *  ...
+ * }</pre>
+ *
+ * <p>As {@link Read} is based on {@link FileIO}, it supports any filesystem 
(hdfs, ...).
+ *
+ * <p>For more advanced use cases, like reading each file in a {@link 
PCollection}
+ * of {@link FileIO.ReadableFile}, use the {@link ReadFiles} transform.
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ *  PCollection<FileIO.ReadableFile> files = pipeline
+ *    .apply(FileIO.match().filepattern(options.getInputFilepattern())
+ *    .apply(FileIO.readMatches());
+ *
+ *  PCollection<GenericRecord> output = 
files.apply(ParquetIO.readFiles(SCHEMA));
+ * }</pre>
+ *
+ *
+ * <h3>Writing Parquet files</h3>
+ *
+ * <p>{@link ParquetIO.Sink} allows you to write a {@link PCollection} of 
{@link GenericRecord}
+ * into a Parquet file. It can be used with the general-purpose {@link FileIO} 
transforms
+ * with FileIO.write/writeDynamic specifically.
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ *  pipeline
+ *    .apply(...) // PCollection<GenericRecord>
+ *    .apply(FileIO.<GenericRecord>
+ *      .write()
+ *      .via(ParquetIO.sink(SCHEMA))
+ *      .to("destination/path")
+ * }</pre>
+ *
+ * <p>This IO API is considered experimental and may break or receive
+ * backwards-incompatible changes in future versions of the Apache Beam SDK.
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class ParquetIO {
+
+  /**
+   * Reads {@link GenericRecord} from a Parquet file (or multiple Parquet 
files matching
+   * the pattern).
+   */
+  public static Read read(Schema schema) {
+    return new AutoValue_ParquetIO_Read.Builder()
+      .setSchema(schema)
+      .build();
+  }
+
+  /**
+   * Like {@link #read(Schema)}, but reads each file in a {@link PCollection}
+   * of {@link org.apache.beam.sdk.io.FileIO.ReadableFile},
+   * which allows more flexible usage.
+   */
+  public static ReadFiles readFiles(Schema schema) {
+    return new AutoValue_ParquetIO_ReadFiles.Builder()
+      .setSchema(schema)
+      .build();
+  }
+
+  /**
+   * Implementation of {@link #read(Schema)}.
+   */
+  @AutoValue
+  public abstract static class Read extends PTransform<PBegin, 
PCollection<GenericRecord>> {
+
+    @Nullable
+    abstract ValueProvider<String> getFilepattern();
+
+    @Nullable
+    abstract Schema getSchema();
+
+    abstract Builder builder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setFilepattern(ValueProvider<String> filepattern);
+      abstract Builder setSchema(Schema schema);
+
+      abstract Read build();
+    }
+
+    /**
+     * Reads from the given filename or filepattern.
+     */
+    public Read from(ValueProvider<String> filepattern) {
+      return builder().setFilepattern(filepattern).build();
+    }
+
+    /** Like {@link #from(ValueProvider)}. */
+    public Read from(String filepattern) {
+      return from(ValueProvider.StaticValueProvider.of(filepattern));
+    }
+
+    @Override
+    public PCollection<GenericRecord> expand(PBegin input) {
+      checkNotNull(getFilepattern(), "Filepattern cannot be null.");
+
+      return input
+        .apply("Create filepattern", Create.ofProvider(getFilepattern(), 
StringUtf8Coder.of()))
+        .apply(FileIO.matchAll())
+        .apply(FileIO.readMatches())
+        .apply(readFiles(getSchema()));
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+        .add(DisplayData.item("filePattern", 
getFilepattern()).withLabel("Input File Pattern"));
+    }
+  }
+
+  /**
+   * Implementation of {@link #readFiles(Schema)}.
+   */
+  @AutoValue
+  public abstract static class ReadFiles extends 
PTransform<PCollection<FileIO.ReadableFile>,
+      PCollection<GenericRecord>> {
+
+    @Nullable
+    abstract Schema getSchema();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setSchema(Schema schema);
+      abstract ReadFiles build();
+    }
+
+    @Override
+    public PCollection<GenericRecord> expand(PCollection<FileIO.ReadableFile> 
input) {
+      checkNotNull(getSchema(), "Schema can not be null");
+      return input.apply(ParDo.of(new 
ReadFn())).setCoder(AvroCoder.of(getSchema()));
+    }
+
+    static class ReadFn extends DoFn<FileIO.ReadableFile, GenericRecord> {
+
+      @ProcessElement
+      public void processElement(ProcessContext processContext) throws 
Exception {
+        FileIO.ReadableFile file = processContext.element();
+
+        if (!file.getMetadata().isReadSeekEfficient()) {
+          ResourceId filename = file.getMetadata().resourceId();
+          throw new RuntimeException(String.format("File has to be seekable: 
%s", filename));
+        }
+
+        SeekableByteChannel seekableByteChannel = file.openSeekable();
+
+        try (ParquetReader<GenericRecord> reader =
+                 AvroParquetReader
+                   .<GenericRecord>builder(new 
BeamParquetInputFile(seekableByteChannel))
+                   .build()) {
+          GenericRecord read;
+          while ((read = reader.read()) != null) {
+            processContext.output(read);
+          }
+        }
+      }
+    }
+
+    private static class BeamParquetInputFile implements InputFile {
+
+      private SeekableByteChannel seekableByteChannel;
+
+      BeamParquetInputFile(SeekableByteChannel seekableByteChannel) {
+        this.seekableByteChannel = seekableByteChannel;
+      }
+
+      @Override
+      public long getLength() throws IOException {
+        return seekableByteChannel.size();
+      }
+
+      @Override
+      public SeekableInputStream newStream() {
+        return new 
DelegatingSeekableInputStream(Channels.newInputStream(seekableByteChannel)) {
+
+          @Override
+          public long getPos() throws IOException {
+            return seekableByteChannel.position();
+          }
+
+          @Override
+          public void seek(long newPos) throws IOException {
+            seekableByteChannel.position(newPos);
+          }
+        };
+      }
+    }
+  }
+
+  /**
+   * Creates a {@link Sink} that, for use with {@link FileIO#write}.
+   */
+  public static Sink sink(Schema schema) {
+    return new AutoValue_ParquetIO_Sink.Builder()
+      .setJsonSchema(schema.toString())
+      .build();
+  }
+
+  /** Implementation of {@link #sink}. */
+  @AutoValue
+  public abstract static class Sink implements FileIO.Sink<GenericRecord> {
+
+    @Nullable
+    abstract String getJsonSchema();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setJsonSchema(String jsonSchema);
+      abstract Sink build();
+    }
+
+    @Nullable
+    private transient ParquetWriter<GenericRecord> writer;
+
+    @Override
+    public void open(WritableByteChannel channel) throws IOException {
+      checkNotNull(getJsonSchema(), "Schema cannot be null");
+
+      Schema schema = new Schema.Parser().parse(getJsonSchema());
+
+      BeamParquetOutputFile beamParquetOutputFile =
+        new BeamParquetOutputFile(Channels.newOutputStream(channel));
+
+      this.writer = 
AvroParquetWriter.<GenericRecord>builder(beamParquetOutputFile)
+        .withSchema(schema)
+        .withWriteMode(OVERWRITE)
+        .build();
+    }
+
+    @Override
+    public void write(GenericRecord element) throws IOException {
+      checkNotNull(writer, "Writer cannot be null");
+      writer.write(element);
+    }
+
+    @Override
+    public void flush() throws IOException {
+      writer.close();
+    }
+
+    private static class BeamParquetOutputFile implements OutputFile {
+
+      private OutputStream outputStream;
+
+      BeamParquetOutputFile(OutputStream outputStream) {
+        this.outputStream = outputStream;
+      }
+
+      @Override
+      public PositionOutputStream create(long blockSizeHint) {
+        return new BeamOutputStream(outputStream);
+      }
+
+      @Override
+      public PositionOutputStream createOrOverwrite(long blockSizeHint) {
+        return new BeamOutputStream(outputStream);
+      }
+
+      @Override
+      public boolean supportsBlockSize() {
+        return false;
+      }
+
+      @Override
+      public long defaultBlockSize() {
+        return 0;
+      }
+    }
+
+    private static class BeamOutputStream extends PositionOutputStream {
+      private long position = 0;
+      private OutputStream outputStream;
+
+      private BeamOutputStream(OutputStream outputStream) {
+        this.outputStream = outputStream;
+      }
+
+      @Override public long getPos() throws IOException {
+        return position;
+      }
+
+      @Override public void write(int b) throws IOException {
+        position++;
+        outputStream.write(b);
+      }
+
+      @Override
+      public void write(byte[] b) throws IOException {
+        write(b, 0, b.length);
+      }
+
+      @Override
+      public void write(byte[] b, int off, int len) throws IOException {
+        outputStream.write(b, off, len);
+        position += len;
+      }
+
+      @Override
+      public void flush() throws IOException {
+        outputStream.flush();
+      }
+
+      @Override
+      public void close() throws IOException {
+        outputStream.close();
+      }
+    }
+  }
+
+  /** Disallow construction of utility class. */
+  private ParquetIO() {}
+}
diff --git 
a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/package-info.java
 
b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/package-info.java
new file mode 100644
index 00000000000..e22d3c580bf
--- /dev/null
+++ 
b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Transforms for reading and writing from Parquet.
+ */
+package org.apache.beam.sdk.io.parquet;
diff --git 
a/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java
 
b/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java
new file mode 100644
index 00000000000..cc52b6d7cab
--- /dev/null
+++ 
b/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.parquet;
+
+import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Test on the {@link ParquetIO}.
+ */
+@RunWith(JUnit4.class)
+public class ParquetIOTest implements Serializable {
+
+  @Rule
+  public transient TestPipeline mainPipeline = TestPipeline.create();
+
+  @Rule
+  public transient TestPipeline readPipeline = TestPipeline.create();
+
+  @Rule
+  public transient TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  private static final String SCHEMA_STRING =
+    "{"
+      + "\"type\":\"record\", "
+      + "\"name\":\"testrecord\","
+      + "\"fields\":["
+      + "    {\"name\":\"name\",\"type\":\"string\"}"
+      + "  ]"
+      + "}";
+
+  private static final Schema SCHEMA = new 
Schema.Parser().parse(SCHEMA_STRING);
+
+  private static final String[] SCIENTISTS = new String[] {
+    "Einstein", "Darwin", "Copernicus", "Pasteur", "Curie",
+    "Faraday", "Newton", "Bohr", "Galilei", "Maxwell"
+  };
+
+  @Test
+  public void testWriteAndRead() {
+    List<GenericRecord> records = generateGenericRecords(1000);
+
+    mainPipeline
+      .apply(Create.of(records).withCoder(AvroCoder.of(SCHEMA)))
+      .apply(
+        FileIO.<GenericRecord>write()
+          .via(ParquetIO.sink(SCHEMA))
+          .to(temporaryFolder.getRoot().getAbsolutePath()));
+    mainPipeline.run().waitUntilFinish();
+
+    PCollection<GenericRecord> readBack =
+      readPipeline.apply(
+        ParquetIO.read(SCHEMA)
+          .from(temporaryFolder.getRoot().getAbsolutePath() + "/*"));
+
+    PAssert.that(readBack).containsInAnyOrder(records);
+    readPipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testWriteAndReadFiles() {
+    List<GenericRecord> records = generateGenericRecords(1000);
+
+    PCollection<GenericRecord> writeThenRead = mainPipeline
+      .apply(Create.of(records).withCoder(AvroCoder.of(SCHEMA)))
+      .apply(FileIO.<GenericRecord>
+        write()
+        .via(ParquetIO.sink(SCHEMA))
+        .to(temporaryFolder.getRoot().getAbsolutePath()))
+      .getPerDestinationOutputFilenames()
+      .apply(Values.create())
+      .apply(FileIO.matchAll())
+      .apply(FileIO.readMatches())
+      .apply(ParquetIO.readFiles(SCHEMA));
+
+    PAssert.that(writeThenRead).containsInAnyOrder(records);
+
+    mainPipeline.run().waitUntilFinish();
+  }
+
+  private List<GenericRecord> generateGenericRecords(long count) {
+    ArrayList<GenericRecord> data = new ArrayList<>();
+    GenericRecordBuilder builder = new GenericRecordBuilder(SCHEMA);
+    for (int i = 0; i < count; i++) {
+      int index = i % SCIENTISTS.length;
+      GenericRecord record = builder.set("name", SCIENTISTS[index]).build();
+      data.add(record);
+    }
+    return data;
+  }
+
+  @Test
+  public void testReadDisplayData() {
+    DisplayData displayData =
+      DisplayData.from(
+        ParquetIO.read(SCHEMA)
+          .from("foo.parquet"));
+
+    Assert.assertThat(displayData, hasDisplayItem("filePattern", 
"foo.parquet"));
+  }
+}
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 1bc93a13fe6..622c2024703 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -61,6 +61,7 @@
     <module>kinesis</module>
     <module>mongodb</module>
     <module>mqtt</module>
+    <module>parquet</module>
     <module>redis</module>
     <module>solr</module>
     <module>tika</module>
diff --git a/settings.gradle b/settings.gradle
index f0703a32adb..73eeb3b1274 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -132,6 +132,8 @@ include "beam-sdks-java-io-mongodb"
 project(":beam-sdks-java-io-mongodb").dir = file("sdks/java/io/mongodb")
 include "beam-sdks-java-io-mqtt"
 project(":beam-sdks-java-io-mqtt").dir = file("sdks/java/io/mqtt")
+include "beam-sdks-java-io-parquet"
+project(":beam-sdks-java-io-parquet").dir = file("sdks/java/io/parquet")
 include "beam-sdks-java-io-redis"
 project(":beam-sdks-java-io-redis").dir = file("sdks/java/io/redis")
 include "beam-sdks-java-io-solr"


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 105158)
    Time Spent: 15h  (was: 14h 50m)

> Create Parquet IO
> -----------------
>
>                 Key: BEAM-214
>                 URL: https://issues.apache.org/jira/browse/BEAM-214
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-ideas
>            Reporter: Neville Li
>            Assignee: Jean-Baptiste Onofré
>            Priority: Minor
>          Time Spent: 15h
>  Remaining Estimate: 0h
>
> Would be nice to support Parquet files with projection and predicates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to