[FLINK-9753] [formats] Add a Parquet BulkWriter

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/66b1f854
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/66b1f854
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/66b1f854

Branch: refs/heads/master
Commit: 66b1f854a0250bdd048808d40f93aa2990476841
Parents: b56c75c
Author: Stephan Ewen <[email protected]>
Authored: Fri Jul 20 16:14:31 2018 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Fri Jul 20 16:26:52 2018 +0200

----------------------------------------------------------------------
 flink-formats/flink-parquet/pom.xml             | 160 ++++++
 .../flink/formats/parquet/ParquetBuilder.java   |  39 ++
 .../formats/parquet/ParquetBulkWriter.java      |  64 +++
 .../formats/parquet/ParquetWriterFactory.java   |  60 +++
 .../parquet/PositionOutputStreamAdapter.java    |  73 +++
 .../flink/formats/parquet/StreamOutputFile.java |  82 +++
 .../parquet/avro/ParquetAvroWriters.java        |  94 ++++
 .../avro/ParquetStreamingFileSinkITCase.java    | 241 +++++++++
 .../formats/parquet/generated/Address.java      | 517 +++++++++++++++++++
 .../parquet/testutils/FiniteTestSource.java     |  79 +++
 .../src/test/resources/avro/testdata.avsc       |  13 +
 .../src/test/resources/log4j-test.properties    |  23 +
 flink-formats/pom.xml                           |   1 +
 pom.xml                                         |   2 +
 tools/maven/suppressions.xml                    |   1 +
 tools/travis_mvn_watchdog.sh                    |   1 +
 16 files changed, 1450 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/66b1f854/flink-formats/flink-parquet/pom.xml
----------------------------------------------------------------------
diff --git a/flink-formats/flink-parquet/pom.xml 
b/flink-formats/flink-parquet/pom.xml
new file mode 100644
index 0000000..e338d05
--- /dev/null
+++ b/flink-formats/flink-parquet/pom.xml
@@ -0,0 +1,160 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+               xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-formats</artifactId>
+               <version>1.7-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-parquet</artifactId>
+       <name>flink-parquet</name>
+
+       <packaging>jar</packaging>
+
+       <properties>
+               
<flink.format.parquet.version>1.10.0</flink.format.parquet.version>
+       </properties>
+
+       <dependencies>
+
+               <!-- Flink dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <!-- Parquet Dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.parquet</groupId>
+                       <artifactId>parquet-hadoop</artifactId>
+                       <version>${flink.format.parquet.version}</version>
+               </dependency>
+
+               <!-- Hadoop is needed by Parquet -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-shaded-hadoop2</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <!-- For now, fastutil is provided already by flink-runtime -->
+               <dependency>
+                       <groupId>it.unimi.dsi</groupId>
+                       <artifactId>fastutil</artifactId>
+                       <version>8.2.1</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <!-- Optional Parquet Builders for Formats like Avro, Protobuf, 
Thrift -->
+
+               <dependency>
+                       <groupId>org.apache.parquet</groupId>
+                       <artifactId>parquet-avro</artifactId>
+                       <version>${flink.format.parquet.version}</version>
+                       <optional>true</optional>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>org.apache.hadoop</groupId>
+                                       <artifactId>hadoop-client</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>it.unimi.dsi</groupId>
+                                       <artifactId>fastutil</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+               <!-- test dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils_2.11</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java_2.11</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-avro</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+       </dependencies>
+
+
+       <build>
+               <plugins>
+                       <!-- Generate Test class from avro schema -->
+                       <plugin>
+                               <groupId>org.apache.avro</groupId>
+                               <artifactId>avro-maven-plugin</artifactId>
+                               <version>${avro.version}</version>
+                               <executions>
+                                       <execution>
+                                               <phase>generate-sources</phase>
+                                               <goals>
+                                                       <goal>schema</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<testSourceDirectory>${project.basedir}/src/test/resources/avro</testSourceDirectory>
+                                                       
<testOutputDirectory>${project.basedir}/src/test/java/</testOutputDirectory>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+
+                       <!-- skip dependency convergence due to Hadoop 
dependency -->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-enforcer-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <id>dependency-convergence</id>
+                                               <goals>
+                                                       <goal>enforce</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <skip>true</skip>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/66b1f854/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetBuilder.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetBuilder.java
new file mode 100644
index 0000000..d3b1370
--- /dev/null
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetBuilder.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.io.OutputFile;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * A builder to create a {@link ParquetWriter} from a Parquet {@link 
OutputFile}.
+ *
+ * @param <T> The type of elements written by the writer.
+ */
+@FunctionalInterface
+public interface ParquetBuilder<T> extends Serializable {
+
+       /**
+        * Creates and configures a parquet writer to the given output file.
+        */
+       ParquetWriter<T> createWriter(OutputFile out) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/66b1f854/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetBulkWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetBulkWriter.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetBulkWriter.java
new file mode 100644
index 0000000..77bdc5b
--- /dev/null
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetBulkWriter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.BulkWriter;
+
+import org.apache.parquet.hadoop.ParquetWriter;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A simple {@link BulkWriter} implementation that wraps a {@link 
ParquetWriter}.
+ *
+ * @param <T> The type of records written.
+ */
+@PublicEvolving
+public class ParquetBulkWriter<T> implements BulkWriter<T> {
+
+       /** The ParquetWriter to write to. */
+       private final ParquetWriter<T> parquetWriter;
+
+       /**
+        * Creates a new ParquetBulkWriter wrapping the given ParquetWriter.
+        *
+        * @param parquetWriter The ParquetWriter to write to.
+        */
+       public ParquetBulkWriter(ParquetWriter<T> parquetWriter) {
+               this.parquetWriter = checkNotNull(parquetWriter, 
"parquetWriter");
+       }
+
+       @Override
+       public void addElement(T datum) throws IOException {
+               parquetWriter.write(datum);
+       }
+
+       @Override
+       public void flush() {
+               // nothing we can do here
+       }
+
+       @Override
+       public void finish() throws IOException {
+               parquetWriter.close();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/66b1f854/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetWriterFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetWriterFactory.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetWriterFactory.java
new file mode 100644
index 0000000..46245a4
--- /dev/null
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetWriterFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.FSDataOutputStream;
+
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.io.OutputFile;
+
+import java.io.IOException;
+
+/**
+ * A factory that creates a Parquet {@link BulkWriter}. The factory takes a 
user-supplied
+ * builder to assemble Parquet's writer and then turns it into a Flink {@code 
BulkWriter}.
+ *
+ * @param <T> The type of record to write.
+ */
+@PublicEvolving
+public class ParquetWriterFactory<T> implements BulkWriter.Factory<T> {
+
+       private static final long serialVersionUID = 1L;
+
+       /** The builder to construct the ParquetWriter. */
+       private final ParquetBuilder<T> writerBuilder;
+
+       /**
+        * Creates a new ParquetWriterFactory using the given builder to 
assemble the
+        * ParquetWriter.
+        *
+        * @param writerBuilder The builder to construct the ParquetWriter.
+        */
+       public ParquetWriterFactory(ParquetBuilder<T> writerBuilder) {
+               this.writerBuilder = writerBuilder;
+       }
+
+       @Override
+       public BulkWriter<T> create(FSDataOutputStream stream) throws 
IOException {
+               final OutputFile out = new StreamOutputFile(stream);
+               final ParquetWriter<T> writer = writerBuilder.createWriter(out);
+               return new ParquetBulkWriter<>(writer);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/66b1f854/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/PositionOutputStreamAdapter.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/PositionOutputStreamAdapter.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/PositionOutputStreamAdapter.java
new file mode 100644
index 0000000..3949ba7
--- /dev/null
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/PositionOutputStreamAdapter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.FSDataOutputStream;
+
+import org.apache.parquet.io.PositionOutputStream;
+
+import java.io.IOException;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * An adapter to turn Flink's {@link FSDataOutputStream} into a {@link 
PositionOutputStream}.
+ */
+@Internal
+class PositionOutputStreamAdapter extends PositionOutputStream {
+
+       /** The Flink stream written to. */
+       private final FSDataOutputStream out;
+
+       /**
+        * Create a new PositionOutputStreamAdapter.
+        *
+        * @param out The Flink stream written to.
+        */
+       PositionOutputStreamAdapter(FSDataOutputStream out) {
+               this.out = checkNotNull(out, "out");
+       }
+
+       @Override
+       public long getPos() throws IOException {
+               return out.getPos();
+       }
+
+       @Override
+       public void write(int b) throws IOException {
+               out.write(b);
+       }
+
+       @Override
+       public void write(byte[] buffer, int off, int len) throws IOException {
+               out.write(buffer, off, len);
+       }
+
+       @Override
+       public void flush() throws IOException {
+               out.flush();
+       }
+
+       @Override
+       public void close() {
+               // we do not actually close the internal stream here, to 
prevent that the finishing
+               // of the Parquet Writer closes the target output stream
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/66b1f854/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/StreamOutputFile.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/StreamOutputFile.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/StreamOutputFile.java
new file mode 100644
index 0000000..70a8557
--- /dev/null
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/StreamOutputFile.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.FSDataOutputStream;
+
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.io.PositionOutputStream;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of Parquet's {@link OutputFile} interface that goes 
against
+ * a Flink {@link FSDataOutputStream}.
+ *
+ * <p>Because the implementation goes against an open stream, rather than open 
its
+ * own streams against a file, instances can create one stream only.
+ */
+@Internal
+class StreamOutputFile implements OutputFile {
+
+       private static final long DEFAULT_BLOCK_SIZE = 64L * 1024L * 1024L;
+
+       private final FSDataOutputStream stream;
+
+       private final AtomicBoolean used;
+
+       /**
+        * Creates a new StreamOutputFile. The first call to {@link 
#create(long)}
+        * or {@link #createOrOverwrite(long)} returns a stream that writes to 
the given stream.
+        *
+        * @param stream The stream to write to.
+        */
+       StreamOutputFile(FSDataOutputStream stream) {
+               this.stream = checkNotNull(stream);
+               this.used = new AtomicBoolean(false);
+       }
+
+       @Override
+       public PositionOutputStream create(long blockSizeHint) {
+               if (used.compareAndSet(false, true)) {
+                       return new PositionOutputStreamAdapter(stream);
+               }
+               else {
+                       throw new IllegalStateException("A stream against this 
file was already created.");
+               }
+       }
+
+       @Override
+       public PositionOutputStream createOrOverwrite(long blockSizeHint) {
+               return create(blockSizeHint);
+       }
+
+       @Override
+       public boolean supportsBlockSize() {
+               return false;
+       }
+
+       @Override
+       public long defaultBlockSize() {
+               return DEFAULT_BLOCK_SIZE;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/66b1f854/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/ParquetAvroWriters.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/ParquetAvroWriters.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/ParquetAvroWriters.java
new file mode 100644
index 0000000..87ddfdc
--- /dev/null
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/ParquetAvroWriters.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.avro;
+
+import org.apache.flink.formats.parquet.ParquetBuilder;
+import org.apache.flink.formats.parquet.ParquetWriterFactory;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.io.OutputFile;
+
+import java.io.IOException;
+
+/**
+ * Convenience builder to create {@link ParquetWriterFactory} instances for 
the different Avro types.
+ */
+public class ParquetAvroWriters {
+
+       /**
+        * Creates a ParquetWriterFactory for an Avro specific type. The 
Parquet writers will use the
+        * schema of that specific type to build and write the columnar data.
+        *
+        * @param type The class of the type to write.
+        */
+       public static <T extends SpecificRecordBase> ParquetWriterFactory<T> 
forSpecificRecord(Class<T> type) {
+               final String schemaString = 
SpecificData.get().getSchema(type).toString();
+               final ParquetBuilder<T> builder = (out) -> 
createAvroParquetWriter(schemaString, SpecificData.get(), out);
+               return new ParquetWriterFactory<>(builder);
+       }
+
+       /**
+        * Creates a ParquetWriterFactory that accepts and writes Avro generic 
types.
+        * The Parquet writers will use the given schema to build and write the 
columnar data.
+        *
+        * @param schema The schema of the generic type.
+        */
+       public static ParquetWriterFactory<GenericRecord> 
forGenericRecord(Schema schema) {
+               final String schemaString = schema.toString();
+               final ParquetBuilder<GenericRecord> builder = (out) -> 
createAvroParquetWriter(schemaString, GenericData.get(), out);
+               return new ParquetWriterFactory<>(builder);
+       }
+
+       /**
+        * Creates a ParquetWriterFactory for the given type. The Parquet 
writers will use Avro
+        * to reflectively create a schema for the type and use that schema to 
write the columnar data.
+        *
+        * @param type The class of the type to write.
+        */
+       public static <T> ParquetWriterFactory<T> forReflectRecord(Class<T> 
type) {
+               final String schemaString = 
ReflectData.get().getSchema(type).toString();
+               final ParquetBuilder<T> builder = (out) -> 
createAvroParquetWriter(schemaString, ReflectData.get(), out);
+               return new ParquetWriterFactory<>(builder);
+       }
+
+       private static <T> ParquetWriter<T> createAvroParquetWriter(
+                       String schemaString,
+                       GenericData dataModel,
+                       OutputFile out) throws IOException {
+
+               final Schema schema = new Schema.Parser().parse(schemaString);
+
+               return AvroParquetWriter.<T>builder(out)
+                               .withSchema(schema)
+                               .withDataModel(dataModel)
+                               .build();
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /** Class is not meant to be instantiated. */
+       private ParquetAvroWriters() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/66b1f854/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
new file mode 100644
index 0000000..d1f0a5f
--- /dev/null
+++ 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.avro;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.parquet.generated.Address;
+import org.apache.flink.formats.parquet.testutils.FiniteTestSource;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.specific.SpecificData;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.AbstractCollection;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Simple integration test case for writing bulk encoded files with the
+ * {@link StreamingFileSink} with Parquet.
+ */
+@SuppressWarnings("serial")
+public class ParquetStreamingFileSinkITCase extends AbstractTestBase {
+
+       @Test
+       public void testWriteParquetAvroSpecific() throws Exception {
+
+               final File folder = TEMPORARY_FOLDER.newFolder();
+
+               final List<Address> data = Arrays.asList(
+                               new Address(1, "a", "b", "c", "12345"),
+                               new Address(2, "p", "q", "r", "12345"),
+                               new Address(3, "x", "y", "z", "12345")
+               );
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(1);
+               env.enableCheckpointing(100);
+
+               DataStream<Address> stream = env.addSource(
+                               new FiniteTestSource<>(data), 
TypeInformation.of(Address.class));
+
+               stream.addSink(
+                               StreamingFileSink.forBulkFormat(
+                                               Path.fromLocalFile(folder),
+                                               
ParquetAvroWriters.forSpecificRecord(Address.class))
+                               .build());
+
+               env.execute();
+
+               validateResults(folder, SpecificData.get(), data);
+       }
+
+       @Test
+       public void testWriteParquetAvroGeneric() throws Exception {
+
+               final File folder = TEMPORARY_FOLDER.newFolder();
+
+               final Schema schema = Address.getClassSchema();
+
+               final Collection<GenericRecord> data = new 
GenericTestDataCollection();
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(1);
+               env.enableCheckpointing(100);
+
+               DataStream<GenericRecord> stream = env.addSource(
+                               new FiniteTestSource<>(data), new 
GenericRecordAvroTypeInfo(schema));
+
+               stream.addSink(
+                               StreamingFileSink.forBulkFormat(
+                                               Path.fromLocalFile(folder),
+                                               
ParquetAvroWriters.forGenericRecord(schema))
+                                               .build());
+
+               env.execute();
+
+               List<Address> expected = Arrays.asList(
+                               new Address(1, "a", "b", "c", "12345"),
+                               new Address(2, "x", "y", "z", "98765"));
+
+               validateResults(folder, SpecificData.get(), expected);
+       }
+
+       @Test
+       public void testWriteParquetAvroReflect() throws Exception {
+
+               final File folder = TEMPORARY_FOLDER.newFolder();
+
+               final List<Datum> data = Arrays.asList(
+                               new Datum("a", 1), new Datum("b", 2), new 
Datum("c", 3));
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(1);
+               env.enableCheckpointing(100);
+
+               DataStream<Datum> stream = env.addSource(
+                               new FiniteTestSource<>(data), 
TypeInformation.of(Datum.class));
+
+               stream.addSink(
+                               StreamingFileSink.forBulkFormat(
+                                               Path.fromLocalFile(folder),
+                                               
ParquetAvroWriters.forReflectRecord(Datum.class))
+                                               .build());
+
+               env.execute();
+
+               validateResults(folder, ReflectData.get(), data);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static <T> void validateResults(File folder, GenericData 
dataModel, List<T> expected) throws Exception {
+               File[] buckets = folder.listFiles();
+               assertNotNull(buckets);
+               assertEquals(1, buckets.length);
+
+               File[] partFiles = buckets[0].listFiles();
+               assertNotNull(partFiles);
+               assertEquals(1, partFiles.length);
+               assertTrue(partFiles[0].length() > 0);
+
+               List<Address> results = readParquetFile(partFiles[0], 
dataModel);
+               assertEquals(expected, results);
+       }
+
+       private static <T> List<T> readParquetFile(File file, GenericData 
dataModel) throws IOException {
+               InputFile inFile = HadoopInputFile.fromPath(new 
org.apache.hadoop.fs.Path(file.toURI()), new Configuration());
+               ParquetReader<T> reader = 
AvroParquetReader.<T>builder(inFile).withDataModel(dataModel).build();
+
+               ArrayList<T> results = new ArrayList<>();
+               T next;
+               while ((next = reader.read()) != null) {
+                       results.add(next);
+               }
+
+               return results;
+       }
+
+       private static class GenericTestDataCollection extends 
AbstractCollection<GenericRecord> implements Serializable {
+
+               @Override
+               public Iterator<GenericRecord> iterator() {
+                       final GenericRecord rec1 = new 
GenericData.Record(Address.getClassSchema());
+                       rec1.put(0, 1);
+                       rec1.put(1, "a");
+                       rec1.put(2, "b");
+                       rec1.put(3, "c");
+                       rec1.put(4, "12345");
+
+                       final GenericRecord rec2 = new 
GenericData.Record(Address.getClassSchema());
+                       rec2.put(0, 2);
+                       rec2.put(1, "x");
+                       rec2.put(2, "y");
+                       rec2.put(3, "z");
+                       rec2.put(4, "98765");
+
+                       return Arrays.asList(rec1, rec2).iterator();
+               }
+
+               @Override
+               public int size() {
+                       return 2;
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /** Test datum. */
+       public static class Datum implements Serializable {
+
+               public String a;
+               public int b;
+
+               public Datum() {}
+
+               public Datum(String a, int b) {
+                       this.a = a;
+                       this.b = b;
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+
+                       Datum datum = (Datum) o;
+                       return b == datum.b && (a != null ? a.equals(datum.a) : 
datum.a == null);
+               }
+
+               @Override
+               public int hashCode() {
+                       int result = a != null ? a.hashCode() : 0;
+                       result = 31 * result + b;
+                       return result;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/66b1f854/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java
 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java
new file mode 100644
index 0000000..ca8f55f
--- /dev/null
+++ 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java
@@ -0,0 +1,517 @@
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.flink.formats.parquet.generated;
+
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.message.BinaryMessageEncoder;
+import org.apache.avro.message.BinaryMessageDecoder;
+import org.apache.avro.message.SchemaStore;
+
+@SuppressWarnings("all")
[email protected]
+public class Address extends org.apache.avro.specific.SpecificRecordBase 
implements org.apache.avro.specific.SpecificRecord {
+  private static final long serialVersionUID = -7342141701041388589L;
+  public static final org.apache.avro.Schema SCHEMA$ = new 
org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Address\",\"namespace\":\"org.apache.flink.formats.parquet.generated\",\"fields\":[{\"name\":\"num\",\"type\":\"int\"},{\"name\":\"street\",\"type\":\"string\"},{\"name\":\"city\",\"type\":\"string\"},{\"name\":\"state\",\"type\":\"string\"},{\"name\":\"zip\",\"type\":\"string\"}]}");
+  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+
+  private static SpecificData MODEL$ = new SpecificData();
+
+  private static final BinaryMessageEncoder<Address> ENCODER =
+      new BinaryMessageEncoder<Address>(MODEL$, SCHEMA$);
+
+  private static final BinaryMessageDecoder<Address> DECODER =
+      new BinaryMessageDecoder<Address>(MODEL$, SCHEMA$);
+
+  /**
+   * Return the BinaryMessageDecoder instance used by this class.
+   */
+  public static BinaryMessageDecoder<Address> getDecoder() {
+    return DECODER;
+  }
+
+  /**
+   * Create a new BinaryMessageDecoder instance for this class that uses the 
specified {@link SchemaStore}.
+   * @param resolver a {@link SchemaStore} used to find schemas by fingerprint
+   */
+  public static BinaryMessageDecoder<Address> createDecoder(SchemaStore 
resolver) {
+    return new BinaryMessageDecoder<Address>(MODEL$, SCHEMA$, resolver);
+  }
+
+  /** Serializes this Address to a ByteBuffer. */
+  public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException {
+    return ENCODER.encode(this);
+  }
+
+  /** Deserializes a Address from a ByteBuffer. */
+  public static Address fromByteBuffer(
+      java.nio.ByteBuffer b) throws java.io.IOException {
+    return DECODER.decode(b);
+  }
+
+  @Deprecated public int num;
+  @Deprecated public java.lang.CharSequence street;
+  @Deprecated public java.lang.CharSequence city;
+  @Deprecated public java.lang.CharSequence state;
+  @Deprecated public java.lang.CharSequence zip;
+
+  /**
+   * Default constructor.  Note that this does not initialize fields
+   * to their default values from the schema.  If that is desired then
+   * one should use <code>newBuilder()</code>.
+   */
+  public Address() {}
+
+  /**
+   * All-args constructor.
+   * @param num The new value for num
+   * @param street The new value for street
+   * @param city The new value for city
+   * @param state The new value for state
+   * @param zip The new value for zip
+   */
+  public Address(java.lang.Integer num, java.lang.CharSequence street, 
java.lang.CharSequence city, java.lang.CharSequence state, 
java.lang.CharSequence zip) {
+    this.num = num;
+    this.street = street;
+    this.city = city;
+    this.state = state;
+    this.zip = zip;
+  }
+
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+  // Used by DatumWriter.  Applications should not call.
+  public java.lang.Object get(int field$) {
+    switch (field$) {
+    case 0: return num;
+    case 1: return street;
+    case 2: return city;
+    case 3: return state;
+    case 4: return zip;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+
+  // Used by DatumReader.  Applications should not call.
+  @SuppressWarnings(value="unchecked")
+  public void put(int field$, java.lang.Object value$) {
+    switch (field$) {
+    case 0: num = (java.lang.Integer)value$; break;
+    case 1: street = (java.lang.CharSequence)value$; break;
+    case 2: city = (java.lang.CharSequence)value$; break;
+    case 3: state = (java.lang.CharSequence)value$; break;
+    case 4: zip = (java.lang.CharSequence)value$; break;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+
+  /**
+   * Gets the value of the 'num' field.
+   * @return The value of the 'num' field.
+   */
+  public java.lang.Integer getNum() {
+    return num;
+  }
+
+  /**
+   * Sets the value of the 'num' field.
+   * @param value the value to set.
+   */
+  public void setNum(java.lang.Integer value) {
+    this.num = value;
+  }
+
+  /**
+   * Gets the value of the 'street' field.
+   * @return The value of the 'street' field.
+   */
+  public java.lang.CharSequence getStreet() {
+    return street;
+  }
+
+  /**
+   * Sets the value of the 'street' field.
+   * @param value the value to set.
+   */
+  public void setStreet(java.lang.CharSequence value) {
+    this.street = value;
+  }
+
+  /**
+   * Gets the value of the 'city' field.
+   * @return The value of the 'city' field.
+   */
+  public java.lang.CharSequence getCity() {
+    return city;
+  }
+
+  /**
+   * Sets the value of the 'city' field.
+   * @param value the value to set.
+   */
+  public void setCity(java.lang.CharSequence value) {
+    this.city = value;
+  }
+
+  /**
+   * Gets the value of the 'state' field.
+   * @return The value of the 'state' field.
+   */
+  public java.lang.CharSequence getState() {
+    return state;
+  }
+
+  /**
+   * Sets the value of the 'state' field.
+   * @param value the value to set.
+   */
+  public void setState(java.lang.CharSequence value) {
+    this.state = value;
+  }
+
+  /**
+   * Gets the value of the 'zip' field.
+   * @return The value of the 'zip' field.
+   */
+  public java.lang.CharSequence getZip() {
+    return zip;
+  }
+
+  /**
+   * Sets the value of the 'zip' field.
+   * @param value the value to set.
+   */
+  public void setZip(java.lang.CharSequence value) {
+    this.zip = value;
+  }
+
+  /**
+   * Creates a new Address RecordBuilder.
+   * @return A new Address RecordBuilder
+   */
+  public static org.apache.flink.formats.parquet.generated.Address.Builder 
newBuilder() {
+    return new org.apache.flink.formats.parquet.generated.Address.Builder();
+  }
+
+  /**
+   * Creates a new Address RecordBuilder by copying an existing Builder.
+   * @param other The existing builder to copy.
+   * @return A new Address RecordBuilder
+   */
+  public static org.apache.flink.formats.parquet.generated.Address.Builder 
newBuilder(org.apache.flink.formats.parquet.generated.Address.Builder other) {
+    return new 
org.apache.flink.formats.parquet.generated.Address.Builder(other);
+  }
+
+  /**
+   * Creates a new Address RecordBuilder by copying an existing Address 
instance.
+   * @param other The existing instance to copy.
+   * @return A new Address RecordBuilder
+   */
+  public static org.apache.flink.formats.parquet.generated.Address.Builder 
newBuilder(org.apache.flink.formats.parquet.generated.Address other) {
+    return new 
org.apache.flink.formats.parquet.generated.Address.Builder(other);
+  }
+
+  /**
+   * RecordBuilder for Address instances.
+   */
+  public static class Builder extends 
org.apache.avro.specific.SpecificRecordBuilderBase<Address>
+    implements org.apache.avro.data.RecordBuilder<Address> {
+
+    private int num;
+    private java.lang.CharSequence street;
+    private java.lang.CharSequence city;
+    private java.lang.CharSequence state;
+    private java.lang.CharSequence zip;
+
+    /** Creates a new Builder */
+    private Builder() {
+      super(SCHEMA$);
+    }
+
+    /**
+     * Creates a Builder by copying an existing Builder.
+     * @param other The existing Builder to copy.
+     */
+    private Builder(org.apache.flink.formats.parquet.generated.Address.Builder 
other) {
+      super(other);
+      if (isValidValue(fields()[0], other.num)) {
+        this.num = data().deepCopy(fields()[0].schema(), other.num);
+        fieldSetFlags()[0] = true;
+      }
+      if (isValidValue(fields()[1], other.street)) {
+        this.street = data().deepCopy(fields()[1].schema(), other.street);
+        fieldSetFlags()[1] = true;
+      }
+      if (isValidValue(fields()[2], other.city)) {
+        this.city = data().deepCopy(fields()[2].schema(), other.city);
+        fieldSetFlags()[2] = true;
+      }
+      if (isValidValue(fields()[3], other.state)) {
+        this.state = data().deepCopy(fields()[3].schema(), other.state);
+        fieldSetFlags()[3] = true;
+      }
+      if (isValidValue(fields()[4], other.zip)) {
+        this.zip = data().deepCopy(fields()[4].schema(), other.zip);
+        fieldSetFlags()[4] = true;
+      }
+    }
+
+    /**
+     * Creates a Builder by copying an existing Address instance
+     * @param other The existing instance to copy.
+     */
+    private Builder(org.apache.flink.formats.parquet.generated.Address other) {
+            super(SCHEMA$);
+      if (isValidValue(fields()[0], other.num)) {
+        this.num = data().deepCopy(fields()[0].schema(), other.num);
+        fieldSetFlags()[0] = true;
+      }
+      if (isValidValue(fields()[1], other.street)) {
+        this.street = data().deepCopy(fields()[1].schema(), other.street);
+        fieldSetFlags()[1] = true;
+      }
+      if (isValidValue(fields()[2], other.city)) {
+        this.city = data().deepCopy(fields()[2].schema(), other.city);
+        fieldSetFlags()[2] = true;
+      }
+      if (isValidValue(fields()[3], other.state)) {
+        this.state = data().deepCopy(fields()[3].schema(), other.state);
+        fieldSetFlags()[3] = true;
+      }
+      if (isValidValue(fields()[4], other.zip)) {
+        this.zip = data().deepCopy(fields()[4].schema(), other.zip);
+        fieldSetFlags()[4] = true;
+      }
+    }
+
+    /**
+      * Gets the value of the 'num' field.
+      * @return The value.
+      */
+    public java.lang.Integer getNum() {
+      return num;
+    }
+
+    /**
+      * Sets the value of the 'num' field.
+      * @param value The value of 'num'.
+      * @return This builder.
+      */
+    public org.apache.flink.formats.parquet.generated.Address.Builder 
setNum(int value) {
+      validate(fields()[0], value);
+      this.num = value;
+      fieldSetFlags()[0] = true;
+      return this;
+    }
+
+    /**
+      * Checks whether the 'num' field has been set.
+      * @return True if the 'num' field has been set, false otherwise.
+      */
+    public boolean hasNum() {
+      return fieldSetFlags()[0];
+    }
+
+
+    /**
+      * Clears the value of the 'num' field.
+      * @return This builder.
+      */
+    public org.apache.flink.formats.parquet.generated.Address.Builder 
clearNum() {
+      fieldSetFlags()[0] = false;
+      return this;
+    }
+
+    /**
+      * Gets the value of the 'street' field.
+      * @return The value.
+      */
+    public java.lang.CharSequence getStreet() {
+      return street;
+    }
+
+    /**
+      * Sets the value of the 'street' field.
+      * @param value The value of 'street'.
+      * @return This builder.
+      */
+    public org.apache.flink.formats.parquet.generated.Address.Builder 
setStreet(java.lang.CharSequence value) {
+      validate(fields()[1], value);
+      this.street = value;
+      fieldSetFlags()[1] = true;
+      return this;
+    }
+
+    /**
+      * Checks whether the 'street' field has been set.
+      * @return True if the 'street' field has been set, false otherwise.
+      */
+    public boolean hasStreet() {
+      return fieldSetFlags()[1];
+    }
+
+
+    /**
+      * Clears the value of the 'street' field.
+      * @return This builder.
+      */
+    public org.apache.flink.formats.parquet.generated.Address.Builder 
clearStreet() {
+      street = null;
+      fieldSetFlags()[1] = false;
+      return this;
+    }
+
+    /**
+      * Gets the value of the 'city' field.
+      * @return The value.
+      */
+    public java.lang.CharSequence getCity() {
+      return city;
+    }
+
+    /**
+      * Sets the value of the 'city' field.
+      * @param value The value of 'city'.
+      * @return This builder.
+      */
+    public org.apache.flink.formats.parquet.generated.Address.Builder 
setCity(java.lang.CharSequence value) {
+      validate(fields()[2], value);
+      this.city = value;
+      fieldSetFlags()[2] = true;
+      return this;
+    }
+
+    /**
+      * Checks whether the 'city' field has been set.
+      * @return True if the 'city' field has been set, false otherwise.
+      */
+    public boolean hasCity() {
+      return fieldSetFlags()[2];
+    }
+
+
+    /**
+      * Clears the value of the 'city' field.
+      * @return This builder.
+      */
+    public org.apache.flink.formats.parquet.generated.Address.Builder 
clearCity() {
+      city = null;
+      fieldSetFlags()[2] = false;
+      return this;
+    }
+
+    /**
+      * Gets the value of the 'state' field.
+      * @return The value.
+      */
+    public java.lang.CharSequence getState() {
+      return state;
+    }
+
+    /**
+      * Sets the value of the 'state' field.
+      * @param value The value of 'state'.
+      * @return This builder.
+      */
+    public org.apache.flink.formats.parquet.generated.Address.Builder 
setState(java.lang.CharSequence value) {
+      validate(fields()[3], value);
+      this.state = value;
+      fieldSetFlags()[3] = true;
+      return this;
+    }
+
+    /**
+      * Checks whether the 'state' field has been set.
+      * @return True if the 'state' field has been set, false otherwise.
+      */
+    public boolean hasState() {
+      return fieldSetFlags()[3];
+    }
+
+
+    /**
+      * Clears the value of the 'state' field.
+      * @return This builder.
+      */
+    public org.apache.flink.formats.parquet.generated.Address.Builder 
clearState() {
+      state = null;
+      fieldSetFlags()[3] = false;
+      return this;
+    }
+
+    /**
+      * Gets the value of the 'zip' field.
+      * @return The value.
+      */
+    public java.lang.CharSequence getZip() {
+      return zip;
+    }
+
+    /**
+      * Sets the value of the 'zip' field.
+      * @param value The value of 'zip'.
+      * @return This builder.
+      */
+    public org.apache.flink.formats.parquet.generated.Address.Builder 
setZip(java.lang.CharSequence value) {
+      validate(fields()[4], value);
+      this.zip = value;
+      fieldSetFlags()[4] = true;
+      return this;
+    }
+
+    /**
+      * Checks whether the 'zip' field has been set.
+      * @return True if the 'zip' field has been set, false otherwise.
+      */
+    public boolean hasZip() {
+      return fieldSetFlags()[4];
+    }
+
+
+    /**
+      * Clears the value of the 'zip' field.
+      * @return This builder.
+      */
+    public org.apache.flink.formats.parquet.generated.Address.Builder 
clearZip() {
+      zip = null;
+      fieldSetFlags()[4] = false;
+      return this;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Address build() {
+      try {
+        Address record = new Address();
+        record.num = fieldSetFlags()[0] ? this.num : (java.lang.Integer) 
defaultValue(fields()[0]);
+        record.street = fieldSetFlags()[1] ? this.street : 
(java.lang.CharSequence) defaultValue(fields()[1]);
+        record.city = fieldSetFlags()[2] ? this.city : 
(java.lang.CharSequence) defaultValue(fields()[2]);
+        record.state = fieldSetFlags()[3] ? this.state : 
(java.lang.CharSequence) defaultValue(fields()[3]);
+        record.zip = fieldSetFlags()[4] ? this.zip : (java.lang.CharSequence) 
defaultValue(fields()[4]);
+        return record;
+      } catch (java.lang.Exception e) {
+        throw new org.apache.avro.AvroRuntimeException(e);
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private static final org.apache.avro.io.DatumWriter<Address>
+    WRITER$ = 
(org.apache.avro.io.DatumWriter<Address>)MODEL$.createDatumWriter(SCHEMA$);
+
+  @Override public void writeExternal(java.io.ObjectOutput out)
+    throws java.io.IOException {
+    WRITER$.write(this, SpecificData.getEncoder(out));
+  }
+
+  @SuppressWarnings("unchecked")
+  private static final org.apache.avro.io.DatumReader<Address>
+    READER$ = 
(org.apache.avro.io.DatumReader<Address>)MODEL$.createDatumReader(SCHEMA$);
+
+  @Override public void readExternal(java.io.ObjectInput in)
+    throws java.io.IOException {
+    READER$.read(this, SpecificData.getDecoder(in));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/66b1f854/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/testutils/FiniteTestSource.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/testutils/FiniteTestSource.java
 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/testutils/FiniteTestSource.java
new file mode 100644
index 0000000..03db7ff
--- /dev/null
+++ 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/testutils/FiniteTestSource.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.testutils;
+
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import java.util.Arrays;
+
+/**
+ * A stream source that emits elements without allowing checkpoints and waits
+ * for two more checkpoints to complete before exiting.
+ */
+@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+public class FiniteTestSource<T> implements SourceFunction<T>, 
CheckpointListener {
+
+       private static final long serialVersionUID = 1L;
+
+       @SuppressWarnings("NonSerializableFieldInSerializableClass")
+       private final Iterable<T> elements;
+
+       private volatile boolean running = true;
+
+       private transient int numCheckpointsComplete;
+
+       @SafeVarargs
+       public FiniteTestSource(T... elements) {
+               this(Arrays.asList(elements));
+       }
+
+       public FiniteTestSource(Iterable<T> elements) {
+               this.elements = elements;
+       }
+
+       @Override
+       public void run(SourceContext<T> ctx) throws Exception {
+               final Object lock = ctx.getCheckpointLock();
+               final int checkpointToAwait;
+
+               synchronized (lock) {
+                       checkpointToAwait = numCheckpointsComplete + 2;
+                       for (T t : elements) {
+                               ctx.collect(t);
+                       }
+               }
+
+               synchronized (lock) {
+                       while (running && numCheckpointsComplete < 
checkpointToAwait) {
+                               lock.wait(1);
+                       }
+               }
+       }
+
+       @Override
+       public void cancel() {
+               running = false;
+       }
+
+       @Override
+       public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
+               numCheckpointsComplete++;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/66b1f854/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc
----------------------------------------------------------------------
diff --git a/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc 
b/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc
new file mode 100644
index 0000000..c781900
--- /dev/null
+++ b/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc
@@ -0,0 +1,13 @@
+[
+{"namespace": "org.apache.flink.formats.parquet.generated",
+ "type": "record",
+ "name": "Address",
+ "fields": [
+     {"name": "num", "type": "int"},
+     {"name": "street", "type": "string"},
+     {"name": "city", "type": "string"},
+     {"name": "state", "type": "string"},
+     {"name": "zip", "type": "string"}
+  ]
+}
+]

http://git-wip-us.apache.org/repos/asf/flink/blob/66b1f854/flink-formats/flink-parquet/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-parquet/src/test/resources/log4j-test.properties 
b/flink-formats/flink-parquet/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2a30ab8
--- /dev/null
+++ b/flink-formats/flink-parquet/src/test/resources/log4j-test.properties
@@ -0,0 +1,23 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=OFF, testlogger
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/66b1f854/flink-formats/pom.xml
----------------------------------------------------------------------
diff --git a/flink-formats/pom.xml b/flink-formats/pom.xml
index 282a2cf..c17ca58 100644
--- a/flink-formats/pom.xml
+++ b/flink-formats/pom.xml
@@ -39,6 +39,7 @@ under the License.
                <module>flink-avro</module>
                <module>flink-json</module>
                <module>flink-avro-confluent-registry</module>
+               <module>flink-parquet</module>
        </modules>
 
        <!-- override these root dependencies as 'provided', so they don't end 
up

http://git-wip-us.apache.org/repos/asf/flink/blob/66b1f854/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index cd30e38..3540215 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1126,6 +1126,8 @@ under the License.
 
                                                
<exclude>flink-formats/flink-avro/src/test/resources/testdata.avro</exclude>
                                                
<exclude>flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/generated/*.java</exclude>
+                                               
<exclude>flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/*.java</exclude>
+                                               
<exclude>flink-formats/flink-parquet/src/test/resources/avro/**</exclude>
                                                
<exclude>flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/data_csv</exclude>
                                                
<exclude>flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/data_text</exclude>
                                                <!-- netty test file, still 
Apache License 2.0 but with a different header -->

http://git-wip-us.apache.org/repos/asf/flink/blob/66b1f854/tools/maven/suppressions.xml
----------------------------------------------------------------------
diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml
index ae029d6..4594d7f 100644
--- a/tools/maven/suppressions.xml
+++ b/tools/maven/suppressions.xml
@@ -24,6 +24,7 @@ under the License.
 
 <suppressions>
                <suppress 
files="org[\\/]apache[\\/]flink[\\/]formats[\\/]avro[\\/]generated[\\/].*.java" 
checks="[a-zA-Z0-9]*"/>
+               <suppress 
files="org[\\/]apache[\\/]flink[\\/]formats[\\/]parquet[\\/]generated[\\/].*.java"
 checks="[a-zA-Z0-9]*"/>
                <!-- Sometimes we have to temporarily fix very long, different 
formatted Calcite files. -->
                <suppress files="org[\\/]apache[\\/]calcite.*" 
checks="[a-zA-Z0-9]*"/>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/66b1f854/tools/travis_mvn_watchdog.sh
----------------------------------------------------------------------
diff --git a/tools/travis_mvn_watchdog.sh b/tools/travis_mvn_watchdog.sh
index a47475b..c4620c8 100755
--- a/tools/travis_mvn_watchdog.sh
+++ b/tools/travis_mvn_watchdog.sh
@@ -79,6 +79,7 @@ flink-filesystems/flink-mapr-fs,\
 flink-filesystems/flink-s3-fs-hadoop,\
 flink-filesystems/flink-s3-fs-presto,\
 flink-formats/flink-avro,\
+flink-formats/flink-parquet,\
 flink-connectors/flink-hbase,\
 flink-connectors/flink-hcatalog,\
 flink-connectors/flink-hadoop-compatibility,\

Reply via email to