[FLINK-9753] [formats] Add a Parquet BulkWriter
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fdb11c52 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fdb11c52 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fdb11c52 Branch: refs/heads/release-1.6 Commit: fdb11c52f1d0f39abceee02f4ad5beaf1034e05f Parents: f998f0f Author: Stephan Ewen <[email protected]> Authored: Fri Jul 20 16:14:31 2018 +0200 Committer: kkloudas <[email protected]> Committed: Mon Jul 23 14:23:25 2018 +0200 ---------------------------------------------------------------------- flink-formats/flink-parquet/pom.xml | 160 ++++++ .../flink/formats/parquet/ParquetBuilder.java | 39 ++ .../formats/parquet/ParquetBulkWriter.java | 64 +++ .../formats/parquet/ParquetWriterFactory.java | 60 +++ .../parquet/PositionOutputStreamAdapter.java | 73 +++ .../flink/formats/parquet/StreamOutputFile.java | 82 +++ .../parquet/avro/ParquetAvroWriters.java | 94 ++++ .../avro/ParquetStreamingFileSinkITCase.java | 241 +++++++++ .../formats/parquet/generated/Address.java | 517 +++++++++++++++++++ .../parquet/testutils/FiniteTestSource.java | 79 +++ .../src/test/resources/avro/testdata.avsc | 13 + .../src/test/resources/log4j-test.properties | 23 + flink-formats/pom.xml | 1 + pom.xml | 2 + tools/maven/suppressions.xml | 1 + tools/travis_mvn_watchdog.sh | 1 + 16 files changed, 1450 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fdb11c52/flink-formats/flink-parquet/pom.xml ---------------------------------------------------------------------- diff --git a/flink-formats/flink-parquet/pom.xml b/flink-formats/flink-parquet/pom.xml new file mode 100644 index 0000000..14e61ea --- /dev/null +++ b/flink-formats/flink-parquet/pom.xml @@ -0,0 +1,160 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-formats</artifactId> + <version>1.6-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-parquet</artifactId> + <name>flink-parquet</name> + + <packaging>jar</packaging> + + <properties> + <flink.format.parquet.version>1.10.0</flink.format.parquet.version> + </properties> + + <dependencies> + + <!-- Flink dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <!-- Parquet Dependencies --> + + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-hadoop</artifactId> + <version>${flink.format.parquet.version}</version> + </dependency> + + <!-- Hadoop is needed by Parquet --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-shaded-hadoop2</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <!-- For now, fastutil is provided already by flink-runtime --> + <dependency> + <groupId>it.unimi.dsi</groupId> + <artifactId>fastutil</artifactId> + <version>8.2.1</version> + <scope>provided</scope> + </dependency> + + <!-- Optional Parquet Builders for Formats like Avro, Protobuf, Thrift --> + + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-avro</artifactId> + <version>${flink.format.parquet.version}</version> + <optional>true</optional> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </exclusion> + <exclusion> + <groupId>it.unimi.dsi</groupId> + <artifactId>fastutil</artifactId> + </exclusion> + </exclusions> + </dependency> + + <!-- test dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_2.11</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.11</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-avro</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + </dependencies> + + + <build> + <plugins> + <!-- Generate Test class from avro schema --> + <plugin> + <groupId>org.apache.avro</groupId> + <artifactId>avro-maven-plugin</artifactId> + <version>${avro.version}</version> + <executions> + <execution> + <phase>generate-sources</phase> + <goals> + <goal>schema</goal> + </goals> + <configuration> + <testSourceDirectory>${project.basedir}/src/test/resources/avro</testSourceDirectory> + <testOutputDirectory>${project.basedir}/src/test/java/</testOutputDirectory> + </configuration> + </execution> + </executions> + </plugin> + + <!-- skip dependency convergence due to Hadoop dependency --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-enforcer-plugin</artifactId> + <executions> + <execution> + <id>dependency-convergence</id> + <goals> + <goal>enforce</goal> + </goals> + <configuration> + <skip>true</skip> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/fdb11c52/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetBuilder.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetBuilder.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetBuilder.java new file mode 100644 index 0000000..d3b1370 --- /dev/null +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetBuilder.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.io.OutputFile; + +import java.io.IOException; +import java.io.Serializable; + +/** + * A builder to create a {@link ParquetWriter} from a Parquet {@link OutputFile}. + * + * @param <T> The type of elements written by the writer. + */ +@FunctionalInterface +public interface ParquetBuilder<T> extends Serializable { + + /** + * Creates and configures a parquet writer to the given output file. + */ + ParquetWriter<T> createWriter(OutputFile out) throws IOException; +} http://git-wip-us.apache.org/repos/asf/flink/blob/fdb11c52/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetBulkWriter.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetBulkWriter.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetBulkWriter.java new file mode 100644 index 0000000..77bdc5b --- /dev/null +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetBulkWriter.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.BulkWriter; + +import org.apache.parquet.hadoop.ParquetWriter; + +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A simple {@link BulkWriter} implementation that wraps a {@link ParquetWriter}. + * + * @param <T> The type of records written. + */ +@PublicEvolving +public class ParquetBulkWriter<T> implements BulkWriter<T> { + + /** The ParquetWriter to write to. */ + private final ParquetWriter<T> parquetWriter; + + /** + * Creates a new ParquetBulkWriter wrapping the given ParquetWriter. + * + * @param parquetWriter The ParquetWriter to write to. + */ + public ParquetBulkWriter(ParquetWriter<T> parquetWriter) { + this.parquetWriter = checkNotNull(parquetWriter, "parquetWriter"); + } + + @Override + public void addElement(T datum) throws IOException { + parquetWriter.write(datum); + } + + @Override + public void flush() { + // nothing we can do here + } + + @Override + public void finish() throws IOException { + parquetWriter.close(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/fdb11c52/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetWriterFactory.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetWriterFactory.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetWriterFactory.java new file mode 100644 index 0000000..46245a4 --- /dev/null +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetWriterFactory.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.core.fs.FSDataOutputStream; + +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.io.OutputFile; + +import java.io.IOException; + +/** + * A factory that creates a Parquet {@link BulkWriter}. The factory takes a user-supplied + * builder to assemble Parquet's writer and then turns it into a Flink {@code BulkWriter}. + * + * @param <T> The type of record to write. + */ +@PublicEvolving +public class ParquetWriterFactory<T> implements BulkWriter.Factory<T> { + + private static final long serialVersionUID = 1L; + + /** The builder to construct the ParquetWriter. */ + private final ParquetBuilder<T> writerBuilder; + + /** + * Creates a new ParquetWriterFactory using the given builder to assemble the + * ParquetWriter. + * + * @param writerBuilder The builder to construct the ParquetWriter. + */ + public ParquetWriterFactory(ParquetBuilder<T> writerBuilder) { + this.writerBuilder = writerBuilder; + } + + @Override + public BulkWriter<T> create(FSDataOutputStream stream) throws IOException { + final OutputFile out = new StreamOutputFile(stream); + final ParquetWriter<T> writer = writerBuilder.createWriter(out); + return new ParquetBulkWriter<>(writer); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/fdb11c52/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/PositionOutputStreamAdapter.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/PositionOutputStreamAdapter.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/PositionOutputStreamAdapter.java new file mode 100644 index 0000000..3949ba7 --- /dev/null +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/PositionOutputStreamAdapter.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.FSDataOutputStream; + +import org.apache.parquet.io.PositionOutputStream; + +import java.io.IOException; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * An adapter to turn Flink's {@link FSDataOutputStream} into a {@link PositionOutputStream}. + */ +@Internal +class PositionOutputStreamAdapter extends PositionOutputStream { + + /** The Flink stream written to. */ + private final FSDataOutputStream out; + + /** + * Create a new PositionOutputStreamAdapter. + * + * @param out The Flink stream written to. + */ + PositionOutputStreamAdapter(FSDataOutputStream out) { + this.out = checkNotNull(out, "out"); + } + + @Override + public long getPos() throws IOException { + return out.getPos(); + } + + @Override + public void write(int b) throws IOException { + out.write(b); + } + + @Override + public void write(byte[] buffer, int off, int len) throws IOException { + out.write(buffer, off, len); + } + + @Override + public void flush() throws IOException { + out.flush(); + } + + @Override + public void close() { + // we do not actually close the internal stream here, to prevent that the finishing + // of the Parquet Writer closes the target output stream + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/fdb11c52/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/StreamOutputFile.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/StreamOutputFile.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/StreamOutputFile.java new file mode 100644 index 0000000..70a8557 --- /dev/null +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/StreamOutputFile.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.FSDataOutputStream; + +import org.apache.parquet.io.OutputFile; +import org.apache.parquet.io.PositionOutputStream; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * An implementation of Parquet's {@link OutputFile} interface that goes against + * a Flink {@link FSDataOutputStream}. + * + * <p>Because the implementation goes against an open stream, rather than open its + * own streams against a file, instances can create one stream only. + */ +@Internal +class StreamOutputFile implements OutputFile { + + private static final long DEFAULT_BLOCK_SIZE = 64L * 1024L * 1024L; + + private final FSDataOutputStream stream; + + private final AtomicBoolean used; + + /** + * Creates a new StreamOutputFile. The first call to {@link #create(long)} + * or {@link #createOrOverwrite(long)} returns a stream that writes to the given stream. + * + * @param stream The stream to write to. + */ + StreamOutputFile(FSDataOutputStream stream) { + this.stream = checkNotNull(stream); + this.used = new AtomicBoolean(false); + } + + @Override + public PositionOutputStream create(long blockSizeHint) { + if (used.compareAndSet(false, true)) { + return new PositionOutputStreamAdapter(stream); + } + else { + throw new IllegalStateException("A stream against this file was already created."); + } + } + + @Override + public PositionOutputStream createOrOverwrite(long blockSizeHint) { + return create(blockSizeHint); + } + + @Override + public boolean supportsBlockSize() { + return false; + } + + @Override + public long defaultBlockSize() { + return DEFAULT_BLOCK_SIZE; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/fdb11c52/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/ParquetAvroWriters.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/ParquetAvroWriters.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/ParquetAvroWriters.java new file mode 100644 index 0000000..87ddfdc --- /dev/null +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/ParquetAvroWriters.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.avro; + +import org.apache.flink.formats.parquet.ParquetBuilder; +import org.apache.flink.formats.parquet.ParquetWriterFactory; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.io.OutputFile; + +import java.io.IOException; + +/** + * Convenience builder to create {@link ParquetWriterFactory} instances for the different Avro types. + */ +public class ParquetAvroWriters { + + /** + * Creates a ParquetWriterFactory for an Avro specific type. The Parquet writers will use the + * schema of that specific type to build and write the columnar data. + * + * @param type The class of the type to write. + */ + public static <T extends SpecificRecordBase> ParquetWriterFactory<T> forSpecificRecord(Class<T> type) { + final String schemaString = SpecificData.get().getSchema(type).toString(); + final ParquetBuilder<T> builder = (out) -> createAvroParquetWriter(schemaString, SpecificData.get(), out); + return new ParquetWriterFactory<>(builder); + } + + /** + * Creates a ParquetWriterFactory that accepts and writes Avro generic types. + * The Parquet writers will use the given schema to build and write the columnar data. + * + * @param schema The schema of the generic type. + */ + public static ParquetWriterFactory<GenericRecord> forGenericRecord(Schema schema) { + final String schemaString = schema.toString(); + final ParquetBuilder<GenericRecord> builder = (out) -> createAvroParquetWriter(schemaString, GenericData.get(), out); + return new ParquetWriterFactory<>(builder); + } + + /** + * Creates a ParquetWriterFactory for the given type. The Parquet writers will use Avro + * to reflectively create a schema for the type and use that schema to write the columnar data. + * + * @param type The class of the type to write. + */ + public static <T> ParquetWriterFactory<T> forReflectRecord(Class<T> type) { + final String schemaString = ReflectData.get().getSchema(type).toString(); + final ParquetBuilder<T> builder = (out) -> createAvroParquetWriter(schemaString, ReflectData.get(), out); + return new ParquetWriterFactory<>(builder); + } + + private static <T> ParquetWriter<T> createAvroParquetWriter( + String schemaString, + GenericData dataModel, + OutputFile out) throws IOException { + + final Schema schema = new Schema.Parser().parse(schemaString); + + return AvroParquetWriter.<T>builder(out) + .withSchema(schema) + .withDataModel(dataModel) + .build(); + } + + // ------------------------------------------------------------------------ + + /** Class is not meant to be instantiated. */ + private ParquetAvroWriters() {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/fdb11c52/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java new file mode 100644 index 0000000..d1f0a5f --- /dev/null +++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.avro; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.parquet.generated.Address; +import org.apache.flink.formats.parquet.testutils.FiniteTestSource; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; +import org.apache.flink.test.util.AbstractTestBase; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.specific.SpecificData; +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.InputFile; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.util.AbstractCollection; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Simple integration test case for writing bulk encoded files with the + * {@link StreamingFileSink} with Parquet. + */ +@SuppressWarnings("serial") +public class ParquetStreamingFileSinkITCase extends AbstractTestBase { + + @Test + public void testWriteParquetAvroSpecific() throws Exception { + + final File folder = TEMPORARY_FOLDER.newFolder(); + + final List<Address> data = Arrays.asList( + new Address(1, "a", "b", "c", "12345"), + new Address(2, "p", "q", "r", "12345"), + new Address(3, "x", "y", "z", "12345") + ); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.enableCheckpointing(100); + + DataStream<Address> stream = env.addSource( + new FiniteTestSource<>(data), TypeInformation.of(Address.class)); + + stream.addSink( + StreamingFileSink.forBulkFormat( + Path.fromLocalFile(folder), + ParquetAvroWriters.forSpecificRecord(Address.class)) + .build()); + + env.execute(); + + validateResults(folder, SpecificData.get(), data); + } + + @Test + public void testWriteParquetAvroGeneric() throws Exception { + + final File folder = TEMPORARY_FOLDER.newFolder(); + + final Schema schema = Address.getClassSchema(); + + final Collection<GenericRecord> data = new GenericTestDataCollection(); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.enableCheckpointing(100); + + DataStream<GenericRecord> stream = env.addSource( + new FiniteTestSource<>(data), new GenericRecordAvroTypeInfo(schema)); + + stream.addSink( + StreamingFileSink.forBulkFormat( + Path.fromLocalFile(folder), + ParquetAvroWriters.forGenericRecord(schema)) + .build()); + + env.execute(); + + List<Address> expected = Arrays.asList( + new Address(1, "a", "b", "c", "12345"), + new Address(2, "x", "y", "z", "98765")); + + validateResults(folder, SpecificData.get(), expected); + } + + @Test + public void testWriteParquetAvroReflect() throws Exception { + + final File folder = TEMPORARY_FOLDER.newFolder(); + + final List<Datum> data = Arrays.asList( + new Datum("a", 1), new Datum("b", 2), new Datum("c", 3)); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.enableCheckpointing(100); + + DataStream<Datum> stream = env.addSource( + new FiniteTestSource<>(data), TypeInformation.of(Datum.class)); + + stream.addSink( + StreamingFileSink.forBulkFormat( + Path.fromLocalFile(folder), + ParquetAvroWriters.forReflectRecord(Datum.class)) + .build()); + + env.execute(); + + validateResults(folder, ReflectData.get(), data); + } + + // ------------------------------------------------------------------------ + + private static <T> void validateResults(File folder, GenericData dataModel, List<T> expected) throws Exception { + File[] buckets = folder.listFiles(); + assertNotNull(buckets); + assertEquals(1, buckets.length); + + File[] partFiles = buckets[0].listFiles(); + assertNotNull(partFiles); + assertEquals(1, partFiles.length); + assertTrue(partFiles[0].length() > 0); + + List<Address> results = readParquetFile(partFiles[0], dataModel); + assertEquals(expected, results); + } + + private static <T> List<T> readParquetFile(File file, GenericData dataModel) throws IOException { + InputFile inFile = HadoopInputFile.fromPath(new org.apache.hadoop.fs.Path(file.toURI()), new Configuration()); + ParquetReader<T> reader = AvroParquetReader.<T>builder(inFile).withDataModel(dataModel).build(); + + ArrayList<T> results = new ArrayList<>(); + T next; + while ((next = reader.read()) != null) { + results.add(next); + } + + return results; + } + + private static class GenericTestDataCollection extends AbstractCollection<GenericRecord> implements Serializable { + + @Override + public Iterator<GenericRecord> iterator() { + final GenericRecord rec1 = new GenericData.Record(Address.getClassSchema()); + rec1.put(0, 1); + rec1.put(1, "a"); + rec1.put(2, "b"); + rec1.put(3, "c"); + rec1.put(4, "12345"); + + final GenericRecord rec2 = new GenericData.Record(Address.getClassSchema()); + rec2.put(0, 2); + rec2.put(1, "x"); + rec2.put(2, "y"); + rec2.put(3, "z"); + rec2.put(4, "98765"); + + return Arrays.asList(rec1, rec2).iterator(); + } + + @Override + public int size() { + return 2; + } + } + + // ------------------------------------------------------------------------ + + /** Test datum. */ + public static class Datum implements Serializable { + + public String a; + public int b; + + public Datum() {} + + public Datum(String a, int b) { + this.a = a; + this.b = b; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + Datum datum = (Datum) o; + return b == datum.b && (a != null ? a.equals(datum.a) : datum.a == null); + } + + @Override + public int hashCode() { + int result = a != null ? a.hashCode() : 0; + result = 31 * result + b; + return result; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/fdb11c52/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java new file mode 100644 index 0000000..ca8f55f --- /dev/null +++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java @@ -0,0 +1,517 @@ +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package org.apache.flink.formats.parquet.generated; + +import org.apache.avro.specific.SpecificData; +import org.apache.avro.message.BinaryMessageEncoder; +import org.apache.avro.message.BinaryMessageDecoder; +import org.apache.avro.message.SchemaStore; + +@SuppressWarnings("all") [email protected] +public class Address extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + private static final long serialVersionUID = -7342141701041388589L; + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Address\",\"namespace\":\"org.apache.flink.formats.parquet.generated\",\"fields\":[{\"name\":\"num\",\"type\":\"int\"},{\"name\":\"street\",\"type\":\"string\"},{\"name\":\"city\",\"type\":\"string\"},{\"name\":\"state\",\"type\":\"string\"},{\"name\":\"zip\",\"type\":\"string\"}]}"); + public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } + + private static SpecificData MODEL$ = new SpecificData(); + + private static final BinaryMessageEncoder<Address> ENCODER = + new BinaryMessageEncoder<Address>(MODEL$, SCHEMA$); + + private static final BinaryMessageDecoder<Address> DECODER = + new BinaryMessageDecoder<Address>(MODEL$, SCHEMA$); + + /** + * Return the BinaryMessageDecoder instance used by this class. + */ + public static BinaryMessageDecoder<Address> getDecoder() { + return DECODER; + } + + /** + * Create a new BinaryMessageDecoder instance for this class that uses the specified {@link SchemaStore}. + * @param resolver a {@link SchemaStore} used to find schemas by fingerprint + */ + public static BinaryMessageDecoder<Address> createDecoder(SchemaStore resolver) { + return new BinaryMessageDecoder<Address>(MODEL$, SCHEMA$, resolver); + } + + /** Serializes this Address to a ByteBuffer. */ + public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException { + return ENCODER.encode(this); + } + + /** Deserializes a Address from a ByteBuffer. */ + public static Address fromByteBuffer( + java.nio.ByteBuffer b) throws java.io.IOException { + return DECODER.decode(b); + } + + @Deprecated public int num; + @Deprecated public java.lang.CharSequence street; + @Deprecated public java.lang.CharSequence city; + @Deprecated public java.lang.CharSequence state; + @Deprecated public java.lang.CharSequence zip; + + /** + * Default constructor. Note that this does not initialize fields + * to their default values from the schema. If that is desired then + * one should use <code>newBuilder()</code>. + */ + public Address() {} + + /** + * All-args constructor. + * @param num The new value for num + * @param street The new value for street + * @param city The new value for city + * @param state The new value for state + * @param zip The new value for zip + */ + public Address(java.lang.Integer num, java.lang.CharSequence street, java.lang.CharSequence city, java.lang.CharSequence state, java.lang.CharSequence zip) { + this.num = num; + this.street = street; + this.city = city; + this.state = state; + this.zip = zip; + } + + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + // Used by DatumWriter. Applications should not call. + public java.lang.Object get(int field$) { + switch (field$) { + case 0: return num; + case 1: return street; + case 2: return city; + case 3: return state; + case 4: return zip; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + + // Used by DatumReader. Applications should not call. + @SuppressWarnings(value="unchecked") + public void put(int field$, java.lang.Object value$) { + switch (field$) { + case 0: num = (java.lang.Integer)value$; break; + case 1: street = (java.lang.CharSequence)value$; break; + case 2: city = (java.lang.CharSequence)value$; break; + case 3: state = (java.lang.CharSequence)value$; break; + case 4: zip = (java.lang.CharSequence)value$; break; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + + /** + * Gets the value of the 'num' field. + * @return The value of the 'num' field. + */ + public java.lang.Integer getNum() { + return num; + } + + /** + * Sets the value of the 'num' field. + * @param value the value to set. + */ + public void setNum(java.lang.Integer value) { + this.num = value; + } + + /** + * Gets the value of the 'street' field. + * @return The value of the 'street' field. + */ + public java.lang.CharSequence getStreet() { + return street; + } + + /** + * Sets the value of the 'street' field. + * @param value the value to set. + */ + public void setStreet(java.lang.CharSequence value) { + this.street = value; + } + + /** + * Gets the value of the 'city' field. + * @return The value of the 'city' field. + */ + public java.lang.CharSequence getCity() { + return city; + } + + /** + * Sets the value of the 'city' field. + * @param value the value to set. + */ + public void setCity(java.lang.CharSequence value) { + this.city = value; + } + + /** + * Gets the value of the 'state' field. + * @return The value of the 'state' field. + */ + public java.lang.CharSequence getState() { + return state; + } + + /** + * Sets the value of the 'state' field. + * @param value the value to set. + */ + public void setState(java.lang.CharSequence value) { + this.state = value; + } + + /** + * Gets the value of the 'zip' field. + * @return The value of the 'zip' field. + */ + public java.lang.CharSequence getZip() { + return zip; + } + + /** + * Sets the value of the 'zip' field. + * @param value the value to set. + */ + public void setZip(java.lang.CharSequence value) { + this.zip = value; + } + + /** + * Creates a new Address RecordBuilder. + * @return A new Address RecordBuilder + */ + public static org.apache.flink.formats.parquet.generated.Address.Builder newBuilder() { + return new org.apache.flink.formats.parquet.generated.Address.Builder(); + } + + /** + * Creates a new Address RecordBuilder by copying an existing Builder. + * @param other The existing builder to copy. + * @return A new Address RecordBuilder + */ + public static org.apache.flink.formats.parquet.generated.Address.Builder newBuilder(org.apache.flink.formats.parquet.generated.Address.Builder other) { + return new org.apache.flink.formats.parquet.generated.Address.Builder(other); + } + + /** + * Creates a new Address RecordBuilder by copying an existing Address instance. + * @param other The existing instance to copy. + * @return A new Address RecordBuilder + */ + public static org.apache.flink.formats.parquet.generated.Address.Builder newBuilder(org.apache.flink.formats.parquet.generated.Address other) { + return new org.apache.flink.formats.parquet.generated.Address.Builder(other); + } + + /** + * RecordBuilder for Address instances. + */ + public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Address> + implements org.apache.avro.data.RecordBuilder<Address> { + + private int num; + private java.lang.CharSequence street; + private java.lang.CharSequence city; + private java.lang.CharSequence state; + private java.lang.CharSequence zip; + + /** Creates a new Builder */ + private Builder() { + super(SCHEMA$); + } + + /** + * Creates a Builder by copying an existing Builder. + * @param other The existing Builder to copy. + */ + private Builder(org.apache.flink.formats.parquet.generated.Address.Builder other) { + super(other); + if (isValidValue(fields()[0], other.num)) { + this.num = data().deepCopy(fields()[0].schema(), other.num); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.street)) { + this.street = data().deepCopy(fields()[1].schema(), other.street); + fieldSetFlags()[1] = true; + } + if (isValidValue(fields()[2], other.city)) { + this.city = data().deepCopy(fields()[2].schema(), other.city); + fieldSetFlags()[2] = true; + } + if (isValidValue(fields()[3], other.state)) { + this.state = data().deepCopy(fields()[3].schema(), other.state); + fieldSetFlags()[3] = true; + } + if (isValidValue(fields()[4], other.zip)) { + this.zip = data().deepCopy(fields()[4].schema(), other.zip); + fieldSetFlags()[4] = true; + } + } + + /** + * Creates a Builder by copying an existing Address instance + * @param other The existing instance to copy. + */ + private Builder(org.apache.flink.formats.parquet.generated.Address other) { + super(SCHEMA$); + if (isValidValue(fields()[0], other.num)) { + this.num = data().deepCopy(fields()[0].schema(), other.num); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.street)) { + this.street = data().deepCopy(fields()[1].schema(), other.street); + fieldSetFlags()[1] = true; + } + if (isValidValue(fields()[2], other.city)) { + this.city = data().deepCopy(fields()[2].schema(), other.city); + fieldSetFlags()[2] = true; + } + if (isValidValue(fields()[3], other.state)) { + this.state = data().deepCopy(fields()[3].schema(), other.state); + fieldSetFlags()[3] = true; + } + if (isValidValue(fields()[4], other.zip)) { + this.zip = data().deepCopy(fields()[4].schema(), other.zip); + fieldSetFlags()[4] = true; + } + } + + /** + * Gets the value of the 'num' field. + * @return The value. + */ + public java.lang.Integer getNum() { + return num; + } + + /** + * Sets the value of the 'num' field. + * @param value The value of 'num'. + * @return This builder. + */ + public org.apache.flink.formats.parquet.generated.Address.Builder setNum(int value) { + validate(fields()[0], value); + this.num = value; + fieldSetFlags()[0] = true; + return this; + } + + /** + * Checks whether the 'num' field has been set. + * @return True if the 'num' field has been set, false otherwise. + */ + public boolean hasNum() { + return fieldSetFlags()[0]; + } + + + /** + * Clears the value of the 'num' field. + * @return This builder. + */ + public org.apache.flink.formats.parquet.generated.Address.Builder clearNum() { + fieldSetFlags()[0] = false; + return this; + } + + /** + * Gets the value of the 'street' field. + * @return The value. + */ + public java.lang.CharSequence getStreet() { + return street; + } + + /** + * Sets the value of the 'street' field. + * @param value The value of 'street'. + * @return This builder. + */ + public org.apache.flink.formats.parquet.generated.Address.Builder setStreet(java.lang.CharSequence value) { + validate(fields()[1], value); + this.street = value; + fieldSetFlags()[1] = true; + return this; + } + + /** + * Checks whether the 'street' field has been set. + * @return True if the 'street' field has been set, false otherwise. + */ + public boolean hasStreet() { + return fieldSetFlags()[1]; + } + + + /** + * Clears the value of the 'street' field. + * @return This builder. + */ + public org.apache.flink.formats.parquet.generated.Address.Builder clearStreet() { + street = null; + fieldSetFlags()[1] = false; + return this; + } + + /** + * Gets the value of the 'city' field. + * @return The value. + */ + public java.lang.CharSequence getCity() { + return city; + } + + /** + * Sets the value of the 'city' field. + * @param value The value of 'city'. + * @return This builder. + */ + public org.apache.flink.formats.parquet.generated.Address.Builder setCity(java.lang.CharSequence value) { + validate(fields()[2], value); + this.city = value; + fieldSetFlags()[2] = true; + return this; + } + + /** + * Checks whether the 'city' field has been set. + * @return True if the 'city' field has been set, false otherwise. + */ + public boolean hasCity() { + return fieldSetFlags()[2]; + } + + + /** + * Clears the value of the 'city' field. + * @return This builder. + */ + public org.apache.flink.formats.parquet.generated.Address.Builder clearCity() { + city = null; + fieldSetFlags()[2] = false; + return this; + } + + /** + * Gets the value of the 'state' field. + * @return The value. + */ + public java.lang.CharSequence getState() { + return state; + } + + /** + * Sets the value of the 'state' field. + * @param value The value of 'state'. + * @return This builder. + */ + public org.apache.flink.formats.parquet.generated.Address.Builder setState(java.lang.CharSequence value) { + validate(fields()[3], value); + this.state = value; + fieldSetFlags()[3] = true; + return this; + } + + /** + * Checks whether the 'state' field has been set. + * @return True if the 'state' field has been set, false otherwise. + */ + public boolean hasState() { + return fieldSetFlags()[3]; + } + + + /** + * Clears the value of the 'state' field. + * @return This builder. + */ + public org.apache.flink.formats.parquet.generated.Address.Builder clearState() { + state = null; + fieldSetFlags()[3] = false; + return this; + } + + /** + * Gets the value of the 'zip' field. + * @return The value. + */ + public java.lang.CharSequence getZip() { + return zip; + } + + /** + * Sets the value of the 'zip' field. + * @param value The value of 'zip'. + * @return This builder. + */ + public org.apache.flink.formats.parquet.generated.Address.Builder setZip(java.lang.CharSequence value) { + validate(fields()[4], value); + this.zip = value; + fieldSetFlags()[4] = true; + return this; + } + + /** + * Checks whether the 'zip' field has been set. + * @return True if the 'zip' field has been set, false otherwise. + */ + public boolean hasZip() { + return fieldSetFlags()[4]; + } + + + /** + * Clears the value of the 'zip' field. + * @return This builder. + */ + public org.apache.flink.formats.parquet.generated.Address.Builder clearZip() { + zip = null; + fieldSetFlags()[4] = false; + return this; + } + + @Override + @SuppressWarnings("unchecked") + public Address build() { + try { + Address record = new Address(); + record.num = fieldSetFlags()[0] ? this.num : (java.lang.Integer) defaultValue(fields()[0]); + record.street = fieldSetFlags()[1] ? this.street : (java.lang.CharSequence) defaultValue(fields()[1]); + record.city = fieldSetFlags()[2] ? this.city : (java.lang.CharSequence) defaultValue(fields()[2]); + record.state = fieldSetFlags()[3] ? this.state : (java.lang.CharSequence) defaultValue(fields()[3]); + record.zip = fieldSetFlags()[4] ? this.zip : (java.lang.CharSequence) defaultValue(fields()[4]); + return record; + } catch (java.lang.Exception e) { + throw new org.apache.avro.AvroRuntimeException(e); + } + } + } + + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumWriter<Address> + WRITER$ = (org.apache.avro.io.DatumWriter<Address>)MODEL$.createDatumWriter(SCHEMA$); + + @Override public void writeExternal(java.io.ObjectOutput out) + throws java.io.IOException { + WRITER$.write(this, SpecificData.getEncoder(out)); + } + + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumReader<Address> + READER$ = (org.apache.avro.io.DatumReader<Address>)MODEL$.createDatumReader(SCHEMA$); + + @Override public void readExternal(java.io.ObjectInput in) + throws java.io.IOException { + READER$.read(this, SpecificData.getDecoder(in)); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/fdb11c52/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/testutils/FiniteTestSource.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/testutils/FiniteTestSource.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/testutils/FiniteTestSource.java new file mode 100644 index 0000000..03db7ff --- /dev/null +++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/testutils/FiniteTestSource.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.testutils; + +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import java.util.Arrays; + +/** + * A stream source that emits elements without allowing checkpoints and waits + * for two more checkpoints to complete before exiting. + */ +@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") +public class FiniteTestSource<T> implements SourceFunction<T>, CheckpointListener { + + private static final long serialVersionUID = 1L; + + @SuppressWarnings("NonSerializableFieldInSerializableClass") + private final Iterable<T> elements; + + private volatile boolean running = true; + + private transient int numCheckpointsComplete; + + @SafeVarargs + public FiniteTestSource(T... elements) { + this(Arrays.asList(elements)); + } + + public FiniteTestSource(Iterable<T> elements) { + this.elements = elements; + } + + @Override + public void run(SourceContext<T> ctx) throws Exception { + final Object lock = ctx.getCheckpointLock(); + final int checkpointToAwait; + + synchronized (lock) { + checkpointToAwait = numCheckpointsComplete + 2; + for (T t : elements) { + ctx.collect(t); + } + } + + synchronized (lock) { + while (running && numCheckpointsComplete < checkpointToAwait) { + lock.wait(1); + } + } + } + + @Override + public void cancel() { + running = false; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + numCheckpointsComplete++; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/fdb11c52/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc ---------------------------------------------------------------------- diff --git a/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc b/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc new file mode 100644 index 0000000..c781900 --- /dev/null +++ b/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc @@ -0,0 +1,13 @@ +[ +{"namespace": "org.apache.flink.formats.parquet.generated", + "type": "record", + "name": "Address", + "fields": [ + {"name": "num", "type": "int"}, + {"name": "street", "type": "string"}, + {"name": "city", "type": "string"}, + {"name": "state", "type": "string"}, + {"name": "zip", "type": "string"} + ] +} +] http://git-wip-us.apache.org/repos/asf/flink/blob/fdb11c52/flink-formats/flink-parquet/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-formats/flink-parquet/src/test/resources/log4j-test.properties b/flink-formats/flink-parquet/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..2a30ab8 --- /dev/null +++ b/flink-formats/flink-parquet/src/test/resources/log4j-test.properties @@ -0,0 +1,23 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=OFF, testlogger +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n http://git-wip-us.apache.org/repos/asf/flink/blob/fdb11c52/flink-formats/pom.xml ---------------------------------------------------------------------- diff --git a/flink-formats/pom.xml b/flink-formats/pom.xml index 2fb2c67..869683f 100644 --- a/flink-formats/pom.xml +++ b/flink-formats/pom.xml @@ -39,6 +39,7 @@ under the License. <module>flink-avro</module> <module>flink-json</module> <module>flink-avro-confluent-registry</module> + <module>flink-parquet</module> </modules> <!-- override these root dependencies as 'provided', so they don't end up http://git-wip-us.apache.org/repos/asf/flink/blob/fdb11c52/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 65cfacb..b890187 100644 --- a/pom.xml +++ b/pom.xml @@ -1126,6 +1126,8 @@ under the License. <exclude>flink-formats/flink-avro/src/test/resources/testdata.avro</exclude> <exclude>flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/generated/*.java</exclude> + <exclude>flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/*.java</exclude> + <exclude>flink-formats/flink-parquet/src/test/resources/avro/**</exclude> <exclude>flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/data_csv</exclude> <exclude>flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/data_text</exclude> <!-- netty test file, still Apache License 2.0 but with a different header --> http://git-wip-us.apache.org/repos/asf/flink/blob/fdb11c52/tools/maven/suppressions.xml ---------------------------------------------------------------------- diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml index ae029d6..4594d7f 100644 --- a/tools/maven/suppressions.xml +++ b/tools/maven/suppressions.xml @@ -24,6 +24,7 @@ under the License. <suppressions> <suppress files="org[\\/]apache[\\/]flink[\\/]formats[\\/]avro[\\/]generated[\\/].*.java" checks="[a-zA-Z0-9]*"/> + <suppress files="org[\\/]apache[\\/]flink[\\/]formats[\\/]parquet[\\/]generated[\\/].*.java" checks="[a-zA-Z0-9]*"/> <!-- Sometimes we have to temporarily fix very long, different formatted Calcite files. --> <suppress files="org[\\/]apache[\\/]calcite.*" checks="[a-zA-Z0-9]*"/> http://git-wip-us.apache.org/repos/asf/flink/blob/fdb11c52/tools/travis_mvn_watchdog.sh ---------------------------------------------------------------------- diff --git a/tools/travis_mvn_watchdog.sh b/tools/travis_mvn_watchdog.sh index a47475b..c4620c8 100755 --- a/tools/travis_mvn_watchdog.sh +++ b/tools/travis_mvn_watchdog.sh @@ -79,6 +79,7 @@ flink-filesystems/flink-mapr-fs,\ flink-filesystems/flink-s3-fs-hadoop,\ flink-filesystems/flink-s3-fs-presto,\ flink-formats/flink-avro,\ +flink-formats/flink-parquet,\ flink-connectors/flink-hbase,\ flink-connectors/flink-hcatalog,\ flink-connectors/flink-hadoop-compatibility,\
