This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 5d9a9b8 [FLINK-10114] Add ORC BulkWriter support for StreamingFileSink
5d9a9b8 is described below
commit 5d9a9b85edf4994f05518f5cbaf6cd3e2888ae43
Author: Sivaprasanna S <[email protected]>
AuthorDate: Fri Mar 20 22:46:20 2020 +0530
[FLINK-10114] Add ORC BulkWriter support for StreamingFileSink
This closes #11474.
---
docs/dev/connectors/streamfile_sink.md | 202 ++++++++++-
flink-formats/flink-orc/pom.xml | 8 +
.../org/apache/flink/orc/vector/Vectorizer.java | 96 ++++++
.../org/apache/flink/orc/writer/OrcBulkWriter.java | 77 +++++
.../flink/orc/writer/OrcBulkWriterFactory.java | 126 +++++++
.../flink/orc/writer/PhysicalWriterImpl.java | 371 +++++++++++++++++++++
.../java/org/apache/flink/orc/data/Record.java | 59 ++++
.../flink/orc/util/OrcBulkWriterTestUtil.java | 98 ++++++
.../apache/flink/orc/vector/RecordVectorizer.java | 57 ++++
.../flink/orc/writer/OrcBulkWriterITCase.java | 77 +++++
.../apache/flink/orc/writer/OrcBulkWriterTest.java | 83 +++++
11 files changed, 1252 insertions(+), 2 deletions(-)
diff --git a/docs/dev/connectors/streamfile_sink.md
b/docs/dev/connectors/streamfile_sink.md
index 9fe047e..271019a 100644
--- a/docs/dev/connectors/streamfile_sink.md
+++ b/docs/dev/connectors/streamfile_sink.md
@@ -136,12 +136,12 @@ specifying an `Encoder` we have to specify
[BulkWriter.Factory]({{ site.javadocs
The `BulkWriter` logic defines how new elements added, flushed and how the
bulk of records
are finalized for further encoding purposes.
-Flink comes with three built-in BulkWriter factories:
+Flink comes with four built-in BulkWriter factories:
- [ParquetWriterFactory]({{ site.javadocs_baseurl
}}/api/java/org/apache/flink/formats/parquet/ParquetWriterFactory.html)
- [SequenceFileWriterFactory]({{ site.javadocs_baseurl
}}/api/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.html)
- [CompressWriterFactory]({{ site.javadocs_baseurl
}}/api/java/org/apache/flink/formats/compress/CompressWriterFactory.html)
-
+ - [OrcBulkWriterFactory]({{ site.javadocs_baseurl
}}/api/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.html)
<div class="alert alert-info">
<b>IMPORTANT:</b> Bulk Formats can only have `OnCheckpointRollingPolicy`,
which rolls (ONLY) on every checkpoint.
@@ -204,6 +204,204 @@ input.addSink(sink)
</div>
</div>
+#### ORC Format
+
+To enable the data to be bulk encoded in ORC format, Flink offers
[OrcBulkWriterFactory]({{ site.javadocs_baseurl
}}/api/java/org/apache/flink/formats/orc/writers/OrcBulkWriterFactory.html)
+which takes a concrete implementation of [Vectorizer]({{ site.javadocs_baseurl
}}/api/java/org/apache/flink/orc/vector/Vectorizer.html).
+
+Like any other columnar format that encodes data in bulk fashion, Flink's
`OrcBulkWriter` writes the input elements in batches. It uses
+ORC's `VectorizedRowBatch` to achieve this.
+
+Since the input element has to be transformed to a `VectorizedRowBatch`, users
have to extend the abstract `Vectorizer`
+class and override the `vectorize(T element, VectorizedRowBatch batch)`
method. As you can see, the method provides an
+instance of `VectorizedRowBatch` to be used directly by the users so users
just have to write the logic to transform the
+input `element` to `ColumnVectors` and set them in the provided
`VectorizedRowBatch` instance.
+
+For example, if the input element is of type `Person` which looks like:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+class Person {
+ private final String name;
+ private final int age;
+ ...
+}
+
+{% endhighlight %}
+</div>
+
+Then a child implementation to convert the element of type `Person` and set
them in the `VectorizedRowBatch` can be like:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+
+public class PersonVectorizer extends Vectorizer<Person> implements
Serializable {
+ public PersonVectorizer(String schema) {
+ super(schema);
+ }
+ @Override
+ public void vectorize(Person element, VectorizedRowBatch batch) throws
IOException {
+ BytesColumnVector nameColVector = (BytesColumnVector)
batch.cols[0];
+ LongColumnVector ageColVector = (LongColumnVector)
batch.cols[1];
+ int row = batch.size++;
+ nameColVector.setVal(row,
element.getName().getBytes(StandardCharsets.UTF_8));
+ ageColVector.vector[row] = element.getAge();
+ }
+}
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import java.nio.charset.StandardCharsets
+import org.apache.hadoop.hive.ql.exec.vector.{BytesColumnVector,
LongColumnVector}
+
+class PersonVectorizer(schema: String) extends Vectorizer[Person](schema) {
+
+ override def vectorize(element: Person, batch: VectorizedRowBatch): Unit = {
+ val nameColVector = batch.cols(0).asInstanceOf[BytesColumnVector]
+ val ageColVector = batch.cols(1).asInstanceOf[LongColumnVector]
+ nameColVector.setVal(batch.size + 1,
element.getName.getBytes(StandardCharsets.UTF_8))
+ ageColVector.vector(batch.size + 1) = element.getAge
+ }
+
+}
+
+{% endhighlight %}
+</div>
+</div>
+
+To use the ORC bulk encoder in an application, users need to add the following
dependency:
+
+{% highlight xml %}
+<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-orc{{ site.scala_version_suffix }}</artifactId>
+ <version>{{ site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+And then a `StreamingFileSink` that writes data in ORC format can be created
like this:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.orc.writer.OrcBulkWriterFactory;
+
+String schema = "struct<_col0:string,_col1:int>";
+DataStream<Person> stream = ...;
+
+final OrcBulkWriterFactory<Person> writerFactory = new
OrcBulkWriterFactory<>(new PersonVectorizer(schema));
+
+final StreamingFileSink<Person> sink = StreamingFileSink
+ .forBulkFormat(outputBasePath, writerFactory)
+ .build();
+
+input.addSink(sink);
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+import org.apache.flink.orc.writer.OrcBulkWriterFactory
+
+val schema: String = "struct<_col0:string,_col1:int>"
+val input: DataStream[Person] = ...
+val writerFactory = new OrcBulkWriterFactory(new PersonVectorizer(schema));
+
+val sink: StreamingFileSink[Person] = StreamingFileSink
+ .forBulkFormat(outputBasePath, writerFactory)
+ .build()
+
+input.addSink(sink)
+
+{% endhighlight %}
+</div>
+</div>
+
+OrcBulkWriterFactory can also take Hadoop `Configuration` and `Properties` so
that a custom Hadoop configuration and ORC
+writer properties can be provided.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+String schema = ...;
+Configuration conf = ...;
+Properties writerProperties = new Properties();
+
+writerProps.setProperty("orc.compress", "LZ4");
+// Other ORC supported properties can also be set similarly.
+
+final OrcBulkWriterFactory<Person> writerFactory = new OrcBulkWriterFactory<>(
+ new PersonVectorizer(schema), writerProperties, conf);
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val schema: String = ...
+val conf: Configuration = ...
+val writerProperties: Properties = new Properties()
+
+writerProps.setProperty("orc.compress", "LZ4")
+// Other ORC supported properties can also be set similarly.
+
+val writerFactory = new OrcBulkWriterFactory(
+ new PersonVectorizer(schema), writerProperties, conf)
+{% endhighlight %}
+</div>
+</div>
+
+The complete list of ORC writer properties can be found
[here](https://orc.apache.org/docs/hive-config.html).
+
+Users who want to add user metadata to the ORC files can do so by calling
`addUserMetadata(...)` inside the overriding
+`vectorize(...)` method.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+public class PersonVectorizer extends Vectorizer<Person> implements
Serializable {
+ @Override
+ public void vectorize(Person element, VectorizedRowBatch batch) throws
IOException {
+ ...
+ String metadataKey = ...;
+ ByteBuffer metadataValue = ...;
+ this.addUserMetadata(metadataKey, metadataValue);
+ }
+}
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+class PersonVectorizer(schema: String) extends Vectorizer[Person](schema) {
+
+ override def vectorize(element: Person, batch: VectorizedRowBatch): Unit = {
+ ...
+ val metadataKey: String = ...
+ val metadataValue: ByteBuffer = ...
+ addUserMetadata(metadataKey, metadataValue)
+ }
+
+}
+
+{% endhighlight %}
+</div>
+</div>
+
#### Hadoop SequenceFile format
To use the SequenceFile bulk encoder in your application you need to add the
following dependency:
diff --git a/flink-formats/flink-orc/pom.xml b/flink-formats/flink-orc/pom.xml
index 879c8d3..dd8e4a5 100644
--- a/flink-formats/flink-orc/pom.xml
+++ b/flink-formats/flink-orc/pom.xml
@@ -114,6 +114,14 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
</dependencies>
<build>
diff --git
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/vector/Vectorizer.java
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/vector/Vectorizer.java
new file mode 100644
index 0000000..a273a7d
--- /dev/null
+++
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/vector/Vectorizer.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc.vector;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.orc.writer.OrcBulkWriter;
+
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class provides an abstracted set of methods to handle the lifecycle of
{@link VectorizedRowBatch}.
+ *
+ * <p>Users have to extend this class and override the vectorize() method with
the logic
+ * to transform the element to a {@link VectorizedRowBatch}.
+ *
+ * @param <T> The type of the element
+ */
+@PublicEvolving
+public abstract class Vectorizer<T> implements Serializable {
+
+ private final TypeDescription schema;
+
+ private transient Writer writer;
+
+ public Vectorizer(final String schema) {
+ checkNotNull(schema);
+ this.schema = TypeDescription.fromString(schema);
+ }
+
+ /**
+ * Provides the ORC schema.
+ *
+ * @return the ORC schema
+ */
+ public TypeDescription getSchema() {
+ return this.schema;
+ }
+
+ /**
+ * Users are not supposed to use this method since this is intended to
be used only by the {@link OrcBulkWriter}.
+ *
+ * @param writer the underlying ORC Writer.
+ */
+ public void setWriter(Writer writer) {
+ this.writer = writer;
+ }
+
+ /**
+ * Adds arbitrary user metadata to the outgoing ORC file.
+ *
+ * <p>Users who want to dynamically add new metadata either based on
either the input
+ * or from an external system can do so by calling
<code>addUserMetadata(...)</code>
+ * inside the overridden vectorize() method.
+ *
+ * @param key a key to label the data with.
+ * @param value the contents of the metadata.
+ */
+ public void addUserMetadata(String key, ByteBuffer value) {
+ this.writer.addUserMetadata(key, value);
+ }
+
+ /**
+ * Transforms the provided element to ColumnVectors and
+ * sets them in the exposed VectorizedRowBatch.
+ *
+ * @param element The input element
+ * @param batch The batch to write the ColumnVectors
+ * @throws IOException if there is an error while transforming the
input.
+ */
+ public abstract void vectorize(T element, VectorizedRowBatch batch)
throws IOException;
+
+}
diff --git
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/OrcBulkWriter.java
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/OrcBulkWriter.java
new file mode 100644
index 0000000..e10c25a
--- /dev/null
+++
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/OrcBulkWriter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.orc.vector.Vectorizer;
+
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.Writer;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link BulkWriter} implementation that writes data in ORC format.
+ *
+ * @param <T> The type of element written.
+ */
+@Internal
+public class OrcBulkWriter<T> implements BulkWriter<T> {
+
+ private final Writer writer;
+ private final Vectorizer<T> vectorizer;
+ private final VectorizedRowBatch rowBatch;
+
+ OrcBulkWriter(Vectorizer<T> vectorizer, Writer writer) {
+ this.vectorizer = checkNotNull(vectorizer);
+ this.writer = checkNotNull(writer);
+ this.rowBatch = vectorizer.getSchema().createRowBatch();
+
+ // Configure the vectorizer with the writer so that users can
add
+ // metadata on the fly through the Vectorizer#vectorize(...)
method.
+ this.vectorizer.setWriter(this.writer);
+ }
+
+ @Override
+ public void addElement(T element) throws IOException {
+ vectorizer.vectorize(element, rowBatch);
+ if (rowBatch.size == rowBatch.getMaxSize()) {
+ writer.addRowBatch(rowBatch);
+ rowBatch.reset();
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (rowBatch.size != 0) {
+ writer.addRowBatch(rowBatch);
+ rowBatch.reset();
+ }
+ }
+
+ @Override
+ public void finish() throws IOException {
+ flush();
+ writer.close();
+ }
+
+}
diff --git
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.java
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.java
new file mode 100644
index 0000000..b34ef07
--- /dev/null
+++
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.orc.vector.Vectorizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.orc.OrcFile;
+import org.apache.orc.impl.WriterImpl;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A factory that creates an ORC {@link BulkWriter}. The factory takes a user
+ * supplied {@link Vectorizer} implementation to convert the element into an
+ * {@link org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch}.
+ *
+ * @param <T> The type of element to write.
+ */
+@PublicEvolving
+public class OrcBulkWriterFactory<T> implements BulkWriter.Factory<T> {
+
+ /*
+ A dummy Hadoop Path to work around the current implementation of ORC
WriterImpl which
+ works on the basis of a Hadoop FileSystem and Hadoop Path but since we
use a customised
+ ORC PhysicalWriter implementation that uses Flink's own
FSDataOutputStream as the
+ underlying/internal stream instead of Hadoop's FSDataOutputStream, we
don't have to worry
+ about this usage.
+ */
+ private static final Path FIXED_PATH = new Path(".");
+
+ private final Vectorizer<T> vectorizer;
+ private final Properties writerProperties;
+ private final Map<String, String> confMap;
+
+ private OrcFile.WriterOptions writerOptions;
+
+ /**
+ * Creates a new OrcBulkWriterFactory using the provided Vectorizer
+ * implementation.
+ *
+ * @param vectorizer The vectorizer implementation to convert input
+ * record to a VectorizerRowBatch.
+ */
+ public OrcBulkWriterFactory(Vectorizer<T> vectorizer) {
+ this(vectorizer, new Configuration());
+ }
+
+ /**
+ * Creates a new OrcBulkWriterFactory using the provided Vectorizer,
Hadoop
+ * Configuration.
+ *
+ * @param vectorizer The vectorizer implementation to convert input
+ * record to a VectorizerRowBatch.
+ */
+ public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Configuration
configuration) {
+ this(vectorizer, null, configuration);
+ }
+
+ /**
+ * Creates a new OrcBulkWriterFactory using the provided Vectorizer,
Hadoop
+ * Configuration, ORC writer properties.
+ *
+ * @param vectorizer The vectorizer implementation to
convert input
+ * record to a VectorizerRowBatch.
+ * @param writerProperties Properties that can be used in ORC
WriterOptions.
+ */
+ public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Properties
writerProperties, Configuration configuration) {
+ this.vectorizer = checkNotNull(vectorizer);
+ this.writerProperties = writerProperties;
+ this.confMap = new HashMap<>();
+
+ // Todo: Replace the Map based approach with a better approach
+ for (Map.Entry<String, String> entry : configuration) {
+ confMap.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ @Override
+ public BulkWriter<T> create(FSDataOutputStream out) throws IOException {
+ OrcFile.WriterOptions opts = getWriterOptions();
+ opts.physicalWriter(new PhysicalWriterImpl(out, opts));
+
+ return new OrcBulkWriter<>(vectorizer, new WriterImpl(null,
FIXED_PATH, opts));
+ }
+
+ private OrcFile.WriterOptions getWriterOptions() {
+ if (null == writerOptions) {
+ Configuration conf = new Configuration();
+ for (Map.Entry<String, String> entry :
confMap.entrySet()) {
+ conf.set(entry.getKey(), entry.getValue());
+ }
+
+ writerOptions = OrcFile.writerOptions(writerProperties,
conf);
+ writerOptions.setSchema(this.vectorizer.getSchema());
+ }
+
+ return writerOptions;
+ }
+}
+
diff --git
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/PhysicalWriterImpl.java
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/PhysicalWriterImpl.java
new file mode 100644
index 0000000..cf9a06b
--- /dev/null
+++
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/PhysicalWriterImpl.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.FSDataOutputStream;
+
+import com.google.protobuf.CodedOutputStream;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcProto;
+import org.apache.orc.PhysicalWriter;
+import org.apache.orc.impl.HadoopShims;
+import org.apache.orc.impl.OrcCodecPool;
+import org.apache.orc.impl.OutStream;
+import org.apache.orc.impl.StreamName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.orc.impl.WriterImpl.getEstimatedBufferSize;
+
+/**
+ * A slightly customised clone of {@link org.apache.orc.impl.PhysicalFsWriter}.
+ *
+ * <p>Whereas PhysicalFsWriter implementation works on the basis of a Path,
+ * this implementation leverages Flink's {@link FSDataOutputStream} to write
+ * the compressed data.
+ *
+ * <p>NOTE: If the ORC dependency version is updated, this file may have to be
+ * updated as well to be in sync with the new version's PhysicalFsWriter.
+ */
+@Internal
+public class PhysicalWriterImpl implements PhysicalWriter {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PhysicalWriterImpl.class);
+ private static final byte[] ZEROS = new byte[64 * 1024];
+ private static final int HDFS_BUFFER_SIZE = 256 * 1024;
+
+ private final OutStream writer;
+ private final CodedOutputStream protobufWriter;
+ private final CompressionKind compress;
+ private final Map<StreamName, BufferedStream> streams;
+ private final HadoopShims shims;
+ private final int maxPadding;
+ private final int bufferSize;
+ private final long blockSize;
+ private final boolean addBlockPadding;
+ private final boolean writeVariableLengthBlocks;
+
+ private CompressionCodec codec;
+ private FSDataOutputStream out;
+ private long headerLength;
+ private long stripeStart;
+ private long blockOffset;
+ private int metadataLength;
+ private int footerLength;
+
+ PhysicalWriterImpl(FSDataOutputStream out, OrcFile.WriterOptions opts)
throws IOException {
+ if (opts.isEnforceBufferSize()) {
+ this.bufferSize = opts.getBufferSize();
+ } else {
+ this.bufferSize = getEstimatedBufferSize(
+ opts.getStripeSize(),
opts.getSchema().getMaximumId() + 1, opts.getBufferSize());
+ }
+
+ this.out = out;
+ this.blockOffset = 0;
+ this.blockSize = opts.getBlockSize();
+ this.maxPadding = (int) (opts.getPaddingTolerance() * (double)
opts.getBufferSize());
+ this.compress = opts.getCompress();
+ this.codec = OrcCodecPool.getCodec(this.compress);
+ this.streams = new TreeMap<>();
+ this.writer = new OutStream("metadata", this.bufferSize,
this.codec, new DirectStream(this.out));
+ this.shims = opts.getHadoopShims();
+ this.addBlockPadding = opts.getBlockPadding();
+ this.protobufWriter =
CodedOutputStream.newInstance(this.writer);
+ this.writeVariableLengthBlocks =
opts.getWriteVariableLengthBlocks();
+ }
+
+ @Override
+ public void writeHeader() throws IOException {
+ this.out.write("ORC".getBytes());
+ this.headerLength = this.out.getPos();
+ }
+
+ @Override
+ public OutputReceiver createDataStream(StreamName name) throws
IOException {
+ BufferedStream result = streams.get(name);
+
+ if (result == null) {
+ result = new BufferedStream();
+ streams.put(name, result);
+ }
+
+ return result;
+ }
+
+ @Override
+ public void writeIndex(StreamName name, OrcProto.RowIndex.Builder index,
+ CompressionCodec codec)
throws IOException {
+ OutputStream stream = new OutStream(this.toString(),
bufferSize, codec, createDataStream(name));
+ index.build().writeTo(stream);
+ stream.flush();
+ }
+
+ @Override
+ public void writeBloomFilter(StreamName name,
OrcProto.BloomFilterIndex.Builder bloom,
+
CompressionCodec codec) throws IOException {
+ OutputStream stream = new OutStream(this.toString(),
bufferSize, codec, createDataStream(name));
+ bloom.build().writeTo(stream);
+ stream.flush();
+ }
+
+ @Override
+ public void finalizeStripe(OrcProto.StripeFooter.Builder footerBuilder,
+
OrcProto.StripeInformation.Builder dirEntry) throws IOException {
+ long indexSize = 0;
+ long dataSize = 0;
+
+ for (Map.Entry<StreamName, BufferedStream> pair:
streams.entrySet()) {
+ BufferedStream receiver = pair.getValue();
+ if (!receiver.isSuppressed) {
+ long streamSize = receiver.getOutputSize();
+ StreamName name = pair.getKey();
+
footerBuilder.addStreams(OrcProto.Stream.newBuilder().setColumn(name.getColumn())
+
.setKind(name.getKind()).setLength(streamSize));
+ if (StreamName.Area.INDEX == name.getArea()) {
+ indexSize += streamSize;
+ } else {
+ dataSize += streamSize;
+ }
+ }
+ }
+
+ dirEntry.setIndexLength(indexSize).setDataLength(dataSize);
+ OrcProto.StripeFooter footer = footerBuilder.build();
+ // Do we need to pad the file so the stripe doesn't straddle a
block boundary?
+ padStripe(indexSize + dataSize + footer.getSerializedSize());
+
+ // write out the data streams
+ for (Map.Entry<StreamName, BufferedStream> pair :
streams.entrySet()) {
+ pair.getValue().spillToDiskAndClear(out);
+ }
+
+ // Write out the footer.
+ writeStripeFooter(footer, dataSize, indexSize, dirEntry);
+ }
+
+ @Override
+ public void writeFileMetadata(OrcProto.Metadata.Builder builder) throws
IOException {
+ long startPosition = out.getPos();
+ OrcProto.Metadata metadata = builder.build();
+ metadata.writeTo(protobufWriter);
+ protobufWriter.flush();
+ writer.flush();
+ this.metadataLength = (int) (out.getPos() - startPosition);
+ }
+
+ @Override
+ public void writeFileFooter(OrcProto.Footer.Builder builder) throws
IOException {
+ long bodyLength = out.getPos() - metadataLength;
+ builder.setContentLength(bodyLength);
+ builder.setHeaderLength(headerLength);
+ long startPosition = out.getPos();
+ OrcProto.Footer footer = builder.build();
+ footer.writeTo(protobufWriter);
+ protobufWriter.flush();
+ writer.flush();
+ this.footerLength = (int) (out.getPos() - startPosition);
+ }
+
+ @Override
+ public long writePostScript(OrcProto.PostScript.Builder builder) throws
IOException {
+ builder.setFooterLength(footerLength);
+ builder.setMetadataLength(metadataLength);
+
+ OrcProto.PostScript ps = builder.build();
+ // need to write this uncompressed
+ long startPosition = out.getPos();
+ ps.writeTo(out);
+ long length = out.getPos() - startPosition;
+
+ if (length > 255) {
+ throw new IllegalArgumentException("PostScript too
large at " + length);
+ }
+
+ out.write((int) length);
+ return out.getPos();
+ }
+
+ @Override
+ public void close() {
+ // Just release the codec but don't close the internal stream
here to avoid
+ // Stream Closed or ClosedChannelException when Flink performs
checkpoint.
+ OrcCodecPool.returnCodec(compress, codec);
+ codec = null;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ out.flush();
+ }
+
+ @Override
+ public void appendRawStripe(ByteBuffer buffer,
OrcProto.StripeInformation.Builder dirEntry) throws IOException {
+ long start = out.getPos();
+ int length = buffer.remaining();
+ long availBlockSpace = blockSize - (start % blockSize);
+
+ // see if stripe can fit in the current hdfs block, else pad
the remaining
+ // space in the block
+ if (length < blockSize && length > availBlockSpace &&
+ addBlockPadding) {
+ byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE,
availBlockSpace)];
+ LOG.info(String.format("Padding ORC by %d bytes while
merging..",
+ availBlockSpace));
+ start += availBlockSpace;
+ while (availBlockSpace > 0) {
+ int writeLen = (int) Math.min(availBlockSpace,
pad.length);
+ out.write(pad, 0, writeLen);
+ availBlockSpace -= writeLen;
+ }
+ }
+
+ out.write(buffer.array(), buffer.arrayOffset() +
buffer.position(), length);
+ dirEntry.setOffset(start);
+ }
+
+ @Override
+ public CompressionCodec getCompressionCodec() {
+ return this.codec;
+ }
+
+ @Override
+ public long getFileBytes(int column) {
+ long size = 0;
+
+ for (final Map.Entry<StreamName, BufferedStream> pair:
streams.entrySet()) {
+ final BufferedStream receiver = pair.getValue();
+ if (!receiver.isSuppressed) {
+
+ final StreamName name = pair.getKey();
+ if (name.getColumn() == column &&
name.getArea() != StreamName.Area.INDEX) {
+ size += receiver.getOutputSize();
+ }
+ }
+
+ }
+
+ return size;
+ }
+
+ private void padStripe(long stripeSize) throws IOException {
+ this.stripeStart = out.getPos();
+ long previousBytesInBlock = (stripeStart - blockOffset) %
blockSize;
+
+ // We only have options if this isn't the first stripe in the
block
+ if (previousBytesInBlock > 0) {
+ if (previousBytesInBlock + stripeSize >= blockSize) {
+ // Try making a short block
+ if (writeVariableLengthBlocks &&
+ shims.endVariableLengthBlock(out)) {
+ blockOffset = stripeStart;
+ } else if (addBlockPadding) {
+ // if we cross the block boundary,
figure out what we should do
+ long padding = blockSize -
previousBytesInBlock;
+ if (padding <= maxPadding) {
+ writeZeros(out, padding);
+ stripeStart += padding;
+ }
+ }
+ }
+ }
+ }
+
+ private void writeStripeFooter(OrcProto.StripeFooter footer, long
dataSize,
+ long
indexSize, OrcProto.StripeInformation.Builder dirEntry) throws IOException {
+ footer.writeTo(protobufWriter);
+ protobufWriter.flush();
+ writer.flush();
+
+ dirEntry.setOffset(stripeStart);
+ dirEntry.setFooterLength(out.getPos() - stripeStart - dataSize
- indexSize);
+ }
+
+ private static void writeZeros(OutputStream output, long remaining)
throws IOException {
+ while (remaining > 0) {
+ long size = Math.min(ZEROS.length, remaining);
+ output.write(ZEROS, 0, (int) size);
+ remaining -= size;
+ }
+ }
+
+ private static class DirectStream implements OutputReceiver {
+ private final FSDataOutputStream output;
+
+ DirectStream(FSDataOutputStream output) {
+ this.output = output;
+ }
+
+ public void output(ByteBuffer buffer) throws IOException {
+ this.output.write(buffer.array(), buffer.arrayOffset()
+ buffer.position(), buffer.remaining());
+ }
+
+ public void suppress() {
+ throw new UnsupportedOperationException("Can't suppress
direct stream");
+ }
+ }
+
+ private static final class BufferedStream implements OutputReceiver {
+ private boolean isSuppressed = false;
+ private final List<ByteBuffer> output = new ArrayList<>();
+
+ @Override
+ public void output(ByteBuffer buffer) {
+ if (!isSuppressed) {
+ output.add(buffer);
+ }
+ }
+
+ public void suppress() {
+ isSuppressed = true;
+ output.clear();
+ }
+
+ void spillToDiskAndClear(FSDataOutputStream raw) throws
IOException {
+ if (!isSuppressed) {
+ for (ByteBuffer buffer: output) {
+ raw.write(buffer.array(),
buffer.arrayOffset() + buffer.position(),
+ buffer.remaining());
+ }
+ output.clear();
+ }
+ isSuppressed = false;
+ }
+
+ public long getOutputSize() {
+ long result = 0;
+ for (ByteBuffer buffer: output) {
+ result += buffer.remaining();
+ }
+ return result;
+ }
+ }
+
+}
diff --git
a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/data/Record.java
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/data/Record.java
new file mode 100644
index 0000000..f5561a09
--- /dev/null
+++
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/data/Record.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc.data;
+
+import java.io.Serializable;
+
+/**
+ * A sample type used for the integration test case.
+ */
+public class Record implements Serializable {
+ private final String name;
+ private final int age;
+
+ public Record(String name, int age) {
+ this.name = name;
+ this.age = age;
+ }
+
+ public String getName() {
+ return this.name;
+ }
+
+ public int getAge() {
+ return this.age;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+
+ if (!(other instanceof Record)) {
+ return false;
+ }
+
+ Record otherRecord = (Record) other;
+
+ return this.name.equals(otherRecord.getName())
+ && this.age == otherRecord.getAge();
+ }
+}
+
diff --git
a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/util/OrcBulkWriterTestUtil.java
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/util/OrcBulkWriterTestUtil.java
new file mode 100644
index 0000000..7503a85
--- /dev/null
+++
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/util/OrcBulkWriterTestUtil.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc.util;
+
+import org.apache.flink.orc.data.Record;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Util class for the OrcBulkWriter tests.
+ */
+public class OrcBulkWriterTestUtil {
+
+ public static final String USER_METADATA_KEY = "userKey";
+ public static final ByteBuffer USER_METADATA_VALUE =
ByteBuffer.wrap("hello".getBytes());
+
+ public static void validate(File files, List<Record> expected) throws
IOException {
+ final File[] buckets = files.listFiles();
+ assertNotNull(buckets);
+ assertEquals(1, buckets.length);
+
+ final File[] partFiles = buckets[0].listFiles();
+ assertNotNull(partFiles);
+
+ for (File partFile : partFiles) {
+ assertTrue(partFile.length() > 0);
+
+ OrcFile.ReaderOptions readerOptions =
OrcFile.readerOptions(new Configuration());
+ Reader reader = OrcFile.createReader(new
org.apache.hadoop.fs.Path(partFile.toURI()), readerOptions);
+
+ assertEquals(3, reader.getNumberOfRows());
+ assertEquals(2,
reader.getSchema().getFieldNames().size());
+ assertSame(reader.getCompressionKind(),
CompressionKind.LZ4);
+ assertTrue(reader.hasMetadataValue(USER_METADATA_KEY));
+
assertTrue(reader.getMetadataKeys().contains(USER_METADATA_KEY));
+
+ List<Record> results = getResults(reader);
+
+ assertEquals(3, results.size());
+ assertEquals(results, expected);
+ }
+ }
+
+ private static List<Record> getResults(Reader reader) throws
IOException {
+ List<Record> results = new ArrayList<>();
+
+ RecordReader recordReader = reader.rows();
+ VectorizedRowBatch batch = reader.getSchema().createRowBatch();
+
+ while (recordReader.nextBatch(batch)) {
+ BytesColumnVector stringVector = (BytesColumnVector)
batch.cols[0];
+ LongColumnVector intVector = (LongColumnVector)
batch.cols[1];
+ for (int r = 0; r < batch.size; r++) {
+ String name = new
String(stringVector.vector[r], stringVector.start[r], stringVector.length[r]);
+ int age = (int) intVector.vector[r];
+
+ results.add(new Record(name, age));
+ }
+ recordReader.close();
+ }
+
+ return results;
+ }
+}
diff --git
a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/vector/RecordVectorizer.java
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/vector/RecordVectorizer.java
new file mode 100644
index 0000000..5a0e83f
--- /dev/null
+++
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/vector/RecordVectorizer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc.vector;
+
+import org.apache.flink.orc.data.Record;
+import org.apache.flink.orc.util.OrcBulkWriterTestUtil;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * A Vectorizer implementation used for tests.
+ *
+ * <p>It transforms an input element which is of type {@link Record}
+ * to a VectorizedRowBatch.
+ */
+public class RecordVectorizer extends Vectorizer<Record> implements
Serializable {
+
+ public RecordVectorizer(String schema) {
+ super(schema);
+ }
+
+ @Override
+ public void vectorize(Record element, VectorizedRowBatch batch) throws
IOException {
+ BytesColumnVector stringVector = (BytesColumnVector)
batch.cols[0];
+ LongColumnVector intColVector = (LongColumnVector)
batch.cols[1];
+
+ int row = batch.size++;
+
+ stringVector.setVal(row,
element.getName().getBytes(StandardCharsets.UTF_8));
+ intColVector.vector[row] = element.getAge();
+
+ this.addUserMetadata(OrcBulkWriterTestUtil.USER_METADATA_KEY,
OrcBulkWriterTestUtil.USER_METADATA_VALUE);
+ }
+
+}
diff --git
a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkWriterITCase.java
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkWriterITCase.java
new file mode 100644
index 0000000..5dce82e
--- /dev/null
+++
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkWriterITCase.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc.writer;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.orc.data.Record;
+import org.apache.flink.orc.util.OrcBulkWriterTestUtil;
+import org.apache.flink.orc.vector.RecordVectorizer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.streaming.util.FiniteTestSource;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Integration test for writing data in ORC bulk format using
StreamingFileSink.
+ */
+public class OrcBulkWriterITCase extends TestLogger {
+
+ @ClassRule
+ public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
+
+ private final String schema = "struct<_col0:string,_col1:int>";
+ private final List<Record> testData = Arrays.asList(
+ new Record("Sourav", 41), new Record("Saul", 35), new
Record("Kim", 31));
+
+ @Test
+ public void testOrcBulkWriter() throws Exception {
+ final File outDir = TEMPORARY_FOLDER.newFolder();
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ final Properties writerProps = new Properties();
+ writerProps.setProperty("orc.compress", "LZ4");
+
+ final OrcBulkWriterFactory<Record> factory = new
OrcBulkWriterFactory<>(
+ new RecordVectorizer(schema), writerProps, new
Configuration());
+
+ env.setParallelism(1);
+ env.enableCheckpointing(100);
+
+ DataStream<Record> stream = env.addSource(new
FiniteTestSource<>(testData), TypeInformation.of(Record.class));
+ stream.map(str -> str)
+ .addSink(StreamingFileSink
+ .forBulkFormat(new Path(outDir.toURI()),
factory)
+ .build());
+
+ env.execute();
+
+ OrcBulkWriterTestUtil.validate(outDir, testData);
+ }
+}
diff --git
a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkWriterTest.java
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkWriterTest.java
new file mode 100644
index 0000000..5a777a7
--- /dev/null
+++
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkWriterTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc.writer;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.orc.data.Record;
+import org.apache.flink.orc.util.OrcBulkWriterTestUtil;
+import org.apache.flink.orc.vector.RecordVectorizer;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Unit test for the ORC BulkWriter implementation.
+ */
+public class OrcBulkWriterTest {
+
+ @ClassRule
+ public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
+
+ private final String schema = "struct<_col0:string,_col1:int>";
+ private final List<Record> input = Arrays.asList(
+ new Record("Shiv", 44), new Record("Jesse", 23), new
Record("Walt", 50));
+
+ @Test
+ public void testOrcBulkWriter() throws Exception {
+ final File outDir = TEMPORARY_FOLDER.newFolder();
+ final Properties writerProps = new Properties();
+ writerProps.setProperty("orc.compress", "LZ4");
+
+ final OrcBulkWriterFactory<Record> writer = new
OrcBulkWriterFactory<>(
+ new RecordVectorizer(schema), writerProps, new
Configuration());
+
+ StreamingFileSink<Record> sink = StreamingFileSink
+ .forBulkFormat(new Path(outDir.toURI()), writer)
+ .withBucketCheckInterval(10000)
+ .build();
+
+ try (OneInputStreamOperatorTestHarness<Record, Object>
testHarness = new OneInputStreamOperatorTestHarness<>(
+ new StreamSink<>(sink), 1, 1, 0)) {
+
+ testHarness.setup();
+ testHarness.open();
+
+ int time = 0;
+ for (final Record record : input) {
+ testHarness.processElement(record, ++time);
+ }
+
+ testHarness.snapshot(1, ++time);
+ testHarness.notifyOfCompletedCheckpoint(1);
+
+ OrcBulkWriterTestUtil.validate(outDir, input);
+ }
+ }
+
+}