[ 
https://issues.apache.org/jira/browse/BEAM-5309?focusedWorklogId=172642&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172642
 ]

ASF GitHub Bot logged work on BEAM-5309:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 06/Dec/18 13:36
            Start Date: 06/Dec/18 13:36
    Worklog Time Spent: 10m 
      Work Description: aromanenko-dev closed pull request #6691: [BEAM-5309] 
Add streaming support for HadoopFormatIO
URL: https://github.com/apache/beam/pull/6691
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/spark/build.gradle b/runners/spark/build.gradle
index 5a7504032490..2604f4b224d8 100644
--- a/runners/spark/build.gradle
+++ b/runners/spark/build.gradle
@@ -29,6 +29,7 @@ description = "Apache Beam :: Runners :: Spark"
  * we are attempting to reference the "sourceSets.test.output" directly.
  */
 evaluationDependsOn(":beam-sdks-java-core")
+evaluationDependsOn(":beam-sdks-java-io-hadoop-format")
 
 configurations {
   validatesRunner
@@ -87,6 +88,8 @@ dependencies {
   shadowTest library.java.jackson_dataformat_yaml
   shadowTest "org.apache.kafka:kafka_2.11:0.11.0.1"
   validatesRunner project(path: ":beam-sdks-java-core", configuration: 
"shadowTest")
+  validatesRunner project(path: ":beam-sdks-java-io-hadoop-format", 
configuration: "shadowTest")
+  validatesRunner project(path: ":beam-examples-java", configuration: 
"shadowTest")
   validatesRunner project(path: project.path, configuration: "shadowTest")
   validatesRunner project(path: project.path, configuration: "shadow")
   validatesRunner project(path: project.path, configuration: "provided")
@@ -114,8 +117,9 @@ task validatesRunnerBatch(type: Test) {
   systemProperty "spark.ui.enabled", "false"
   systemProperty "spark.ui.showConsoleProgress", "false"
 
+
   classpath = configurations.validatesRunner
-  testClassesDirs = 
files(project(":beam-sdks-java-core").sourceSets.test.output.classesDirs) + 
files(project.sourceSets.test.output.classesDirs)
+  testClassesDirs = 
files(project(":beam-sdks-java-core").sourceSets.test.output.classesDirs) + 
files(project(":beam-sdks-java-io-hadoop-format").sourceSets.test.output.classesDirs)
 + files(project.sourceSets.test.output.classesDirs)
   // Only one SparkContext may be running in a JVM (SPARK-2243)
   forkEvery 1
   maxParallelForks 4
diff --git a/sdks/java/io/hadoop-format/build.gradle 
b/sdks/java/io/hadoop-format/build.gradle
new file mode 100644
index 000000000000..e2f31188c924
--- /dev/null
+++ b/sdks/java/io/hadoop-format/build.gradle
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import groovy.json.JsonOutput
+
+apply plugin: org.apache.beam.gradle.BeamModulePlugin
+applyJavaNature()
+provideIntegrationTestingDependencies()
+enableJavaPerformanceTesting()
+
+description = "Apache Beam :: SDKs :: Java :: IO :: Hadoop Format"
+ext.summary = "IO to read data from sources and to write data to sinks that 
implement Hadoop MapReduce Format."
+
+configurations.create("sparkRunner")
+configurations.sparkRunner {
+  // Ban certain dependencies to prevent a StackOverflow within Spark
+  // because JUL -> SLF4J -> JUL, and similarly JDK14 -> SLF4J -> JDK14
+  exclude group: "org.slf4j", module: "jul-to-slf4j"
+  exclude group: "org.slf4j", module: "slf4j-jdk14"
+}
+
+// Ban dependencies from the test runtime classpath
+configurations.testRuntimeClasspath {
+  // Prevent a StackOverflow because of wiring LOG4J -> SLF4J -> LOG4J
+  exclude group: "org.slf4j", module: "log4j-over-slf4j"
+}
+
+dependencies {
+  shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
+  compile library.java.guava
+  shadow library.java.slf4j_api
+  shadow project(path: ":beam-sdks-java-io-hadoop-common", configuration: 
"shadow")
+  provided library.java.hadoop_common
+  provided library.java.hadoop_hdfs
+  provided library.java.hadoop_mapreduce_client_core
+  testCompile project(path: ":beam-runners-direct-java", configuration: 
"shadow")
+  testCompile project(path: ":beam-sdks-java-core", configuration: 
"shadowTest")
+  testCompile project(path: ":beam-sdks-java-io-common", configuration: 
"shadow")
+  testCompile project(path: ":beam-sdks-java-io-common", configuration: 
"shadowTest")
+  testCompile project(path: ":beam-sdks-java-io-jdbc", configuration: "shadow")
+  testCompile project(path: ":beam-sdks-java-io-hadoop-input-format", 
configuration: "shadowTest")
+  testCompile project(path: ":beam-examples-java", configuration: "shadowTest")
+  testCompile library.java.postgres
+  testCompile library.java.slf4j_jdk14
+  testCompile library.java.junit
+  testCompile library.java.hamcrest_core
+  testCompile library.java.hamcrest_library
+  shadow library.java.commons_io_2x
+
+  delegate.add("sparkRunner", project(path: 
":beam-sdks-java-io-hadoop-format", configuration: "shadow"))
+  delegate.add("sparkRunner", project(path: 
":beam-sdks-java-io-hadoop-format", configuration: "shadowTest"))
+
+  sparkRunner project(path: ":beam-examples-java", configuration: "shadowTest")
+  sparkRunner project(path: ":beam-runners-spark", configuration: "shadow")
+  sparkRunner project(path: ":beam-sdks-java-io-hadoop-file-system", 
configuration: "shadow")
+  sparkRunner library.java.spark_streaming
+  sparkRunner library.java.spark_core
+}
+
+def runnerClass = "org.apache.beam.runners.spark.TestSparkRunner"
+task sparkRunner(type: Test) {
+  group = "Verification"
+  def beamTestPipelineOptions = [
+          "--project=hadoop-format",
+          "--tempRoot=/tmp/hadoop-format/",
+          "--streaming=false",
+          "--runner=" + runnerClass,
+          "--enableSparkMetricSinks=false",
+  ]
+  classpath = configurations.sparkRunner
+  include "**/HadoopFormatIOSequenceFileTest.class"
+  useJUnit {
+    includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
+  }
+  forkEvery 1
+  maxParallelForks 4
+  systemProperty "spark.ui.enabled", "false"
+  systemProperty "spark.ui.showConsoleProgress", "false"
+  systemProperty "beam.spark.test.reuseSparkContext", "true"
+  systemProperty "beamTestPipelineOptions", 
JsonOutput.toJson(beamTestPipelineOptions)
+}
diff --git 
a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/ExternalSynchronization.java
 
b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/ExternalSynchronization.java
new file mode 100644
index 000000000000..14e22ca9b15e
--- /dev/null
+++ 
b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/ExternalSynchronization.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hadoop.format;
+
+import java.io.Serializable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+
+/**
+ * Provides mechanism for acquiring locks related to the job. Serves as source 
of unique events
+ * among the job.
+ */
+public interface ExternalSynchronization extends Serializable {
+
+  /**
+   * Tries to acquire lock for given job.
+   *
+   * @param conf configuration bounded with given job.
+   * @return {@code true} if the lock was acquired, {@code false} otherwise.
+   */
+  boolean tryAcquireJobLock(Configuration conf);
+
+  /**
+   * Deletes lock ids bounded with given job if any exists.
+   *
+   * @param conf hadoop configuration of given job.
+   */
+  void releaseJobIdLock(Configuration conf);
+
+  /**
+   * Creates {@link TaskID} with unique id among given job.
+   *
+   * @param conf hadoop configuration of given job.
+   * @return {@link TaskID} with unique id among given job.
+   */
+  TaskID acquireTaskIdLock(Configuration conf);
+
+  /**
+   * Creates unique {@link TaskAttemptID} for given taskId.
+   *
+   * @param conf configuration of given task and job
+   * @param taskId id of the task
+   * @return Unique {@link TaskAttemptID} for given taskId.
+   */
+  TaskAttemptID acquireTaskAttemptIdLock(Configuration conf, int taskId);
+}
diff --git 
a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronization.java
 
b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronization.java
new file mode 100644
index 000000000000..a435a0683d15
--- /dev/null
+++ 
b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronization.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hadoop.format;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Random;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of {@link ExternalSynchronization} which registers locks in 
the HDFS.
+ *
+ * <p>Requires {@code locksDir} to be specified. This directory MUST be 
different that directory
+ * which is possibly stored under {@code 
"mapreduce.output.fileoutputformat.outputdir"} key.
+ * Otherwise setup of job will fail because the directory will exist before 
job setup.
+ */
+public class HDFSSynchronization implements ExternalSynchronization {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(HDFSSynchronization.class);
+
+  private static final String LOCKS_DIR_PATTERN = "%s/";
+  private static final String LOCKS_DIR_TASK_PATTERN = LOCKS_DIR_PATTERN + 
"%s";
+  private static final String LOCKS_DIR_TASK_ATTEMPT_PATTERN = 
LOCKS_DIR_TASK_PATTERN + "_%s";
+  private static final String LOCKS_DIR_JOB_FILENAME = LOCKS_DIR_PATTERN + 
"_job";
+
+  private static final transient Random RANDOM_GEN = new Random();
+
+  private final String locksDir;
+  private final ThrowingFunction<Configuration, FileSystem, IOException> 
fileSystemFactory;
+
+  /**
+   * Creates instance of {@link HDFSSynchronization}.
+   *
+   * @param locksDir directory where locks will be stored. This directory MUST 
be different that
+   *     directory which is possibly stored under {@code
+   *     "mapreduce.output.fileoutputformat.outputdir"} key. Otherwise setup 
of job will fail
+   *     because the directory will exist before job setup.
+   */
+  public HDFSSynchronization(String locksDir) {
+    this(locksDir, FileSystem::newInstance);
+  }
+
+  /**
+   * Creates instance of {@link HDFSSynchronization}. Exists only for easier 
testing.
+   *
+   * @param locksDir directory where locks will be stored. This directory MUST 
be different that
+   *     directory which is possibly stored under {@code
+   *     "mapreduce.output.fileoutputformat.outputdir"} key. Otherwise setup 
of job will fail
+   *     because the directory will exist before job setup.
+   * @param fileSystemFactory supplier of the file system
+   */
+  HDFSSynchronization(
+      String locksDir, ThrowingFunction<Configuration, FileSystem, 
IOException> fileSystemFactory) {
+    this.locksDir = locksDir;
+    this.fileSystemFactory = fileSystemFactory;
+  }
+
+  @Override
+  public boolean tryAcquireJobLock(Configuration conf) {
+    Path path = new Path(locksDir, String.format(LOCKS_DIR_JOB_FILENAME, 
getJobJtIdentifier(conf)));
+
+    return tryCreateFile(conf, path);
+  }
+
+  @Override
+  public void releaseJobIdLock(Configuration conf) {
+    Path path = new Path(locksDir, String.format(LOCKS_DIR_PATTERN, 
getJobJtIdentifier(conf)));
+
+    try (FileSystem fileSystem = fileSystemFactory.apply(conf)) {
+      if (fileSystem.delete(path, true)) {
+        LOGGER.info("Delete of lock directory {} was successful", path);
+      } else {
+        LOGGER.warn("Delete of lock directory {} was unsuccessful", path);
+      }
+
+    } catch (IOException e) {
+      String formattedExceptionMessage =
+          String.format("Delete of lock directory %s was unsuccessful", path);
+      LOGGER.warn(formattedExceptionMessage, e);
+      throw new IllegalStateException(formattedExceptionMessage, e);
+    }
+  }
+
+  @Override
+  public TaskID acquireTaskIdLock(Configuration conf) {
+    JobID jobId = HadoopFormats.getJobId(conf);
+    boolean lockAcquired = false;
+    int taskIdCandidate = 0;
+
+    while (!lockAcquired) {
+      taskIdCandidate = RANDOM_GEN.nextInt(Integer.MAX_VALUE);
+      Path path =
+          new Path(
+              locksDir,
+              String.format(LOCKS_DIR_TASK_PATTERN, getJobJtIdentifier(conf), 
taskIdCandidate));
+      lockAcquired = tryCreateFile(conf, path);
+    }
+
+    return HadoopFormats.createTaskID(jobId, taskIdCandidate);
+  }
+
+  @Override
+  public TaskAttemptID acquireTaskAttemptIdLock(Configuration conf, int 
taskId) {
+    String jobJtIdentifier = getJobJtIdentifier(conf);
+    JobID jobId = HadoopFormats.getJobId(conf);
+    int taskAttemptCandidate = 0;
+    boolean taskAttemptAcquired = false;
+
+    while (!taskAttemptAcquired) {
+      taskAttemptCandidate++;
+      Path path =
+          new Path(
+              locksDir,
+              String.format(
+                  LOCKS_DIR_TASK_ATTEMPT_PATTERN, jobJtIdentifier, taskId, 
taskAttemptCandidate));
+      taskAttemptAcquired = tryCreateFile(conf, path);
+    }
+
+    return HadoopFormats.createTaskAttemptID(jobId, taskId, 
taskAttemptCandidate);
+  }
+
+  private boolean tryCreateFile(Configuration conf, Path path) {
+    try (FileSystem fileSystem = fileSystemFactory.apply(conf)) {
+      try {
+        return fileSystem.createNewFile(path);
+      } catch (FileAlreadyExistsException | 
org.apache.hadoop.fs.FileAlreadyExistsException e) {
+        return false;
+      } catch (RemoteException e) {
+        //remote hdfs exception
+        if 
(e.getClassName().equals(AlreadyBeingCreatedException.class.getName())) {
+          return false;
+        }
+        throw e;
+      }
+    } catch (IOException e) {
+      throw new IllegalStateException(String.format("Creation of file on path 
%s failed", path), e);
+    }
+  }
+
+  private String getJobJtIdentifier(Configuration conf) {
+    JobID job =
+        Preconditions.checkNotNull(
+            HadoopFormats.getJobId(conf),
+            "Configuration must contain jobID under key %s.",
+            HadoopFormatIO.JOB_ID);
+    return job.getJtIdentifier();
+  }
+
+  /**
+   * Function which can throw exception.
+   *
+   * @param <T1> parameter type
+   * @param <T2> result type
+   * @param <X> exception type
+   */
+  @FunctionalInterface
+  public interface ThrowingFunction<T1, T2, X extends Exception> extends 
Serializable {
+    T2 apply(T1 value) throws X;
+  }
+}
diff --git 
a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
 
b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
new file mode 100644
index 000000000000..409547250a38
--- /dev/null
+++ 
b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
@@ -0,0 +1,1181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hadoop.format;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.Objects.requireNonNull;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link HadoopFormatIO} is a Transform for writing data to any sink which 
implements Hadoop
+ * {@link OutputFormat}. For example - Cassandra, Elasticsearch, HBase, Redis, 
Postgres etc. {@link
+ * HadoopFormatIO} has to make several performance trade-offs in connecting to 
{@link OutputFormat},
+ * so if there is another Beam IO Transform specifically for connecting to 
your data sink of choice,
+ * we would recommend using that one, but this IO Transform allows you to 
connect to many data sinks
+ * that do not yet have a Beam IO Transform.
+ *
+ * <p>You will need to pass a Hadoop {@link Configuration} with parameters 
specifying how the write
+ * will occur. Many properties of the Configuration are optional, and some are 
required for certain
+ * {@link OutputFormat} classes, but the following properties must be set for 
all OutputFormats:
+ *
+ * <ul>
+ *   <li>{@code mapreduce.job.id}: The identifier of the write job. E.g.: end 
timestamp of window.
+ *   <li>{@code mapreduce.job.outputformat.class}: The {@link OutputFormat} 
class used to connect to
+ *       your data sink of choice.
+ *   <li>{@code mapreduce.job.output.key.class}: The key class passed to the 
{@link OutputFormat} in
+ *       {@code mapreduce.job.outputformat.class}.
+ *   <li>{@code mapreduce.job.output.value.class}: The value class passed to 
the {@link
+ *       OutputFormat} in {@code mapreduce.job.outputformat.class}.
+ *   <li>{@code mapreduce.job.reduces}: Number of reduce tasks. Value is equal 
to number of write
+ *       tasks which will be genarated. This property is not required for 
{@link
+ *       Write.PartitionedWriterBuilder#withoutPartitioning()} write.
+ *   <li>{@code mapreduce.job.partitioner.class}: Hadoop partitioner class 
which will be used for
+ *       distributing of records among partitions. This property is not 
required for {@link
+ *       Write.PartitionedWriterBuilder#withoutPartitioning()} write.
+ * </ul>
+ *
+ * <b>Note:</b> All mentioned values have appropriate constants. E.g.: {@link
+ * #OUTPUT_FORMAT_CLASS_ATTR}.
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * Configuration myHadoopConfiguration = new Configuration(false);
+ * // Set Hadoop OutputFormat, key and value class in configuration
+ * myHadoopConfiguration.setClass(&quot;mapreduce.job.outputformat.class&quot;,
+ *    MyDbOutputFormatClass, OutputFormat.class);
+ * myHadoopConfiguration.setClass(&quot;mapreduce.job.output.key.class&quot;,
+ *    MyDbOutputFormatKeyClass, Object.class);
+ * myHadoopConfiguration.setClass(&quot;mapreduce.job.output.value.class&quot;,
+ *    MyDbOutputFormatValueClass, Object.class);
+ * myHadoopConfiguration.setClass(&quot;mapreduce.job.output.value.class&quot;,
+ *    MyPartitionerClass, Object.class);
+ * myHadoopConfiguration.setInt(&quot;mapreduce.job.reduces&quot;, 2);
+ * }</pre>
+ *
+ * <p>You will need to set OutputFormat key and value class (i.e. 
"mapreduce.job.output.key.class"
+ * and "mapreduce.job.output.value.class") in Hadoop {@link Configuration} 
which are equal to {@code
+ * KeyT} and {@code ValueT}. If you set different OutputFormat key or value 
class than
+ * OutputFormat's actual key or value class then, it will throw {@link 
IllegalArgumentException}
+ *
+ * <h3>Writing using {@link HadoopFormatIO}</h3>
+ *
+ * <h4>Batch writing</h4>
+ *
+ * <pre>{@code
+ * //Data which will we want to write
+ * PCollection<KV<Text, LongWritable>> boundedWordsCount = ...
+ *
+ * //Hadoop configuration for write
+ * //We have partitioned write, so Partitioner and reducers count have to be 
set - see withPartitioning() javadoc
+ * Configuration myHadoopConfiguration = ...
+ * //path to directory with locks
+ * String locksDirPath = ...;
+ *
+ * boundedWordsCount.apply(
+ *     "writeBatch",
+ *     HadoopFormatIO.<Text, LongWritable>write()
+ *         .withConfiguration(myHadoopConfiguration)
+ *         .withPartitioning()
+ *         .withExternalSynchronization(new 
HDFSSynchronization(locksDirPath)));
+ * }</pre>
+ *
+ * <h4>Stream writing</h4>
+ *
+ * <pre>{@code
+ *    // Data which will we want to write
+ *   PCollection<KV<Text, LongWritable>> unboundedWordsCount = ...;
+ *   // Transformation which transforms data of one window into one hadoop 
configuration
+ *   PTransform<PCollection<? extends KV<Text, LongWritable>>, 
PCollectionView<Configuration>>
+ *       configTransform = ...;
+ *
+ *   unboundedWordsCount.apply(
+ *       "writeStream",
+ *       HadoopFormatIO.<Text, LongWritable>write()
+ *           .withConfigurationTransform(configTransform)
+ *           .withExternalSynchronization(new 
HDFSSynchronization(locksDirPath)));
+ * }
+ * }</pre>
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class HadoopFormatIO {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(HadoopFormatIO.class);
+
+  /** {@link MRJobConfig#OUTPUT_FORMAT_CLASS_ATTR}. */
+  public static final String OUTPUT_FORMAT_CLASS_ATTR = 
MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR;
+
+  /** {@link MRJobConfig#OUTPUT_KEY_CLASS}. */
+  public static final String OUTPUT_KEY_CLASS = MRJobConfig.OUTPUT_KEY_CLASS;
+
+  /** {@link MRJobConfig#OUTPUT_VALUE_CLASS}. */
+  public static final String OUTPUT_VALUE_CLASS = 
MRJobConfig.OUTPUT_VALUE_CLASS;
+
+  /** {@link MRJobConfig#NUM_REDUCES}. */
+  public static final String NUM_REDUCES = MRJobConfig.NUM_REDUCES;
+
+  /** {@link MRJobConfig#PARTITIONER_CLASS_ATTR}. */
+  public static final String PARTITIONER_CLASS_ATTR = 
MRJobConfig.PARTITIONER_CLASS_ATTR;
+
+  /** {@link MRJobConfig#ID}. */
+  public static final String JOB_ID = MRJobConfig.ID;
+
+  /** {@link MRJobConfig#MAPREDUCE_JOB_DIR}. */
+  public static final String OUTPUT_DIR = FileOutputFormat.OUTDIR;
+
+  /**
+   * Creates an {@link Write.Builder} for creation of Write Transformation. 
Before creation of the
+   * transformation, chain of builders must be set.
+   *
+   * @param <KeyT> Type of keys to be written.
+   * @param <ValueT> Type of values to be written.
+   * @return Write builder
+   */
+  public static <KeyT, ValueT> Write.WriteBuilder<KeyT, ValueT> write() {
+    return new Write.Builder<>();
+  }
+
+  /**
+   * Generates tasks for output pairs and groups them by this key.
+   *
+   * <p>This transformation is used when is configured write with partitioning.
+   *
+   * @param <KeyT> type of key
+   * @param <ValueT> type of value
+   */
+  private static class GroupDataByPartition<KeyT, ValueT>
+      extends PTransform<
+          PCollection<KV<KeyT, ValueT>>, PCollection<KV<Integer, KV<KeyT, 
ValueT>>>> {
+
+    private PCollectionView<Configuration> configView;
+
+    private GroupDataByPartition(PCollectionView<Configuration> configView) {
+      this.configView = configView;
+    }
+
+    @Override
+    public PCollection<KV<Integer, KV<KeyT, ValueT>>> 
expand(PCollection<KV<KeyT, ValueT>> input) {
+      return input
+          .apply(
+              "AssignTask",
+              ParDo.of(new AssignTaskFn<KeyT, 
ValueT>(configView)).withSideInputs(configView))
+          .setTypeDescriptor(
+              TypeDescriptors.kvs(TypeDescriptors.integers(), 
input.getTypeDescriptor()))
+          .apply("GroupByTaskId", GroupByKey.create())
+          .apply("FlattenGroupedTasks", ParDo.of(new FlattenGroupedTasks<>()));
+    }
+  }
+
+  /**
+   * Flattens grouped iterable {@link KV} pairs into triplets of 
TaskID/Key/Value.
+   *
+   * @param <KeyT> Type of keys to be written.
+   * @param <ValueT> Type of values to be written.
+   */
+  private static class FlattenGroupedTasks<KeyT, ValueT>
+      extends DoFn<KV<Integer, Iterable<KV<KeyT, ValueT>>>, KV<Integer, 
KV<KeyT, ValueT>>> {
+
+    @ProcessElement
+    public void processElement(
+        @Element KV<Integer, Iterable<KV<KeyT, ValueT>>> input,
+        OutputReceiver<KV<Integer, KV<KeyT, ValueT>>> outputReceiver) {
+      final Integer key = input.getKey();
+      for (KV<KeyT, ValueT> element :
+          requireNonNull(input.getValue(), "Iterable can not be null.")) {
+        outputReceiver.output(KV.of(key, element));
+      }
+    }
+  }
+
+  /**
+   * A {@link PTransform} that writes to any data sink which implements Hadoop 
OutputFormat. For
+   * e.g. Cassandra, Elasticsearch, HBase, Redis, Postgres, etc. See the 
class-level Javadoc on
+   * {@link HadoopFormatIO} for more information.
+   *
+   * @param <KeyT> Type of keys to be written.
+   * @param <ValueT> Type of values to be written.
+   * @see HadoopFormatIO
+   */
+  public static class Write<KeyT, ValueT> extends 
PTransform<PCollection<KV<KeyT, ValueT>>, PDone> {
+
+    @Nullable private transient Configuration configuration;
+
+    @Nullable
+    private PTransform<PCollection<? extends KV<KeyT, ValueT>>, 
PCollectionView<Configuration>>
+        configTransform;
+
+    private ExternalSynchronization externalSynchronization;
+
+    private boolean withPartitioning;
+
+    public Write(
+        @Nullable Configuration configuration,
+        @Nullable
+            PTransform<PCollection<? extends KV<KeyT, ValueT>>, 
PCollectionView<Configuration>>
+                configTransform,
+        ExternalSynchronization externalSynchronization,
+        boolean withPartitioning) {
+      this.configuration = configuration;
+      this.configTransform = configTransform;
+      this.externalSynchronization = externalSynchronization;
+      this.withPartitioning = withPartitioning;
+    }
+
+    /**
+     * Builder for partitioning determining.
+     *
+     * @param <KeyT> Key type to write
+     * @param <ValueT> Value type to write
+     */
+    public interface PartitionedWriterBuilder<KeyT, ValueT> {
+
+      /**
+       * Writes to the sink with partitioning by Task Id.
+       *
+       * <p>Following Hadoop configuration properties are required with this 
option:
+       *
+       * <ul>
+       *   <li>{@code mapreduce.job.reduces}: Number of reduce tasks. Value is 
equal to number of
+       *       write tasks which will be genarated.
+       *   <li>{@code mapreduce.job.partitioner.class}: Hadoop partitioner 
class which will be used
+       *       for distributing of records among partitions.
+       * </ul>
+       *
+       * @return WriteBuilder for write transformation
+       */
+      ExternalSynchronizationBuilder<KeyT, ValueT> withPartitioning();
+
+      /**
+       * Writes to the sink without need to partition output into specified 
number of partitions.
+       *
+       * <p>This write operation doesn't do shuffle by the partition so it 
saves transfer time
+       * before write operation itself. As a consequence it generates random 
number of partitions.
+       *
+       * <p><b>Note:</b> Works only for {@link
+       * org.apache.beam.sdk.values.PCollection.IsBounded#BOUNDED} {@link 
PCollection} with global
+       * {@link WindowingStrategy}.
+       *
+       * @return WriteBuilder for write transformation
+       */
+      ExternalSynchronizationBuilder<KeyT, ValueT> withoutPartitioning();
+    }
+
+    /**
+     * Builder for External Synchronization defining.
+     *
+     * @param <KeyT> Key type to write
+     * @param <ValueT> Value type to write
+     */
+    public interface ExternalSynchronizationBuilder<KeyT, ValueT> {
+
+      /**
+       * Specifies class which will provide external synchronization required 
for hadoop write
+       * operation.
+       *
+       * @param externalSynchronization provider of external synchronization
+       * @return Write transformation
+       */
+      Write<KeyT, ValueT> withExternalSynchronization(
+          ExternalSynchronization externalSynchronization);
+    }
+
+    /**
+     * Main builder of Write transformation.
+     *
+     * @param <KeyT> Key type to write
+     * @param <ValueT> Value type to write
+     */
+    public interface WriteBuilder<KeyT, ValueT> {
+
+      /**
+       * Writes to the sink using the options provided by the given hadoop 
configuration.
+       *
+       * <p><b>Note:</b> Works only for {@link
+       * org.apache.beam.sdk.values.PCollection.IsBounded#BOUNDED} {@link 
PCollection} with global
+       * {@link WindowingStrategy}.
+       *
+       * @param config hadoop configuration.
+       * @return WriteBuilder with set configuration
+       * @throws NullPointerException when the configuration is null
+       * @see HadoopFormatIO for required hadoop {@link Configuration} 
properties
+       */
+      PartitionedWriterBuilder<KeyT, ValueT> withConfiguration(Configuration 
config);
+
+      /**
+       * Writes to the sink using configuration created by provided {@code
+       * configurationTransformation}.
+       *
+       * <p>This type is useful especially for processing unbounded windowed 
data but can be used
+       * also for batch processing.
+       *
+       * <p>Supports only {@link PCollection} with {@link DefaultTrigger}ing 
and without allowed
+       * lateness
+       *
+       * @param configTransform configuration transformation interface
+       * @return WriteBuilder with set configuration transformation
+       * @throws NullPointerException when {@code configurationTransformation} 
is {@code null}
+       * @see HadoopFormatIO for required hadoop {@link Configuration} 
properties
+       */
+      ExternalSynchronizationBuilder<KeyT, ValueT> withConfigurationTransform(
+          PTransform<PCollection<? extends KV<KeyT, ValueT>>, 
PCollectionView<Configuration>>
+              configTransform);
+    }
+
+    /**
+     * Implementation of all builders.
+     *
+     * @param <KeyT> Key type to write
+     * @param <ValueT> Value type to write
+     */
+    static class Builder<KeyT, ValueT>
+        implements WriteBuilder<KeyT, ValueT>,
+            PartitionedWriterBuilder<KeyT, ValueT>,
+            ExternalSynchronizationBuilder<KeyT, ValueT> {
+
+      private Configuration configuration;
+      private PTransform<PCollection<? extends KV<KeyT, ValueT>>, 
PCollectionView<Configuration>>
+          configTransform;
+      private ExternalSynchronization externalSynchronization;
+      private boolean isWithPartitioning;
+
+      @Override
+      public PartitionedWriterBuilder<KeyT, ValueT> 
withConfiguration(Configuration config) {
+        checkNotNull(config, "Hadoop configuration cannot be null");
+        this.configuration = config;
+        return this;
+      }
+
+      @Override
+      public ExternalSynchronizationBuilder<KeyT, ValueT> 
withConfigurationTransform(
+          PTransform<PCollection<? extends KV<KeyT, ValueT>>, 
PCollectionView<Configuration>>
+              configTransform) {
+        checkNotNull(configTransform, "Configuration transformation cannot be 
null");
+        this.isWithPartitioning = true;
+        this.configTransform = configTransform;
+        return this;
+      }
+
+      @Override
+      public ExternalSynchronizationBuilder<KeyT, ValueT> withPartitioning() {
+        this.isWithPartitioning = true;
+        return this;
+      }
+
+      @Override
+      public ExternalSynchronizationBuilder<KeyT, ValueT> 
withoutPartitioning() {
+        this.isWithPartitioning = false;
+        return this;
+      }
+
+      @Override
+      public Write<KeyT, ValueT> withExternalSynchronization(
+          ExternalSynchronization externalSynchronization) {
+        checkNotNull(externalSynchronization, "External synchronization cannot 
be null");
+        this.externalSynchronization = externalSynchronization;
+        return new Write<>(
+            this.configuration,
+            this.configTransform,
+            this.externalSynchronization,
+            this.isWithPartitioning);
+      }
+    }
+
+    @Override
+    public void validate(PipelineOptions pipelineOptions) {}
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      Configuration hadoopConfig = configuration;
+      if (hadoopConfig != null) {
+        builder.addIfNotNull(
+            DisplayData.item(OUTPUT_FORMAT_CLASS_ATTR, 
hadoopConfig.get(OUTPUT_FORMAT_CLASS_ATTR))
+                .withLabel("OutputFormat Class"));
+        builder.addIfNotNull(
+            DisplayData.item(OUTPUT_KEY_CLASS, 
hadoopConfig.get(OUTPUT_KEY_CLASS))
+                .withLabel("OutputFormat Key Class"));
+        builder.addIfNotNull(
+            DisplayData.item(OUTPUT_VALUE_CLASS, 
hadoopConfig.get(OUTPUT_VALUE_CLASS))
+                .withLabel("OutputFormat Value Class"));
+        builder.addIfNotNull(
+            DisplayData.item(
+                    PARTITIONER_CLASS_ATTR,
+                    hadoopConfig.get(
+                        PARTITIONER_CLASS_ATTR,
+                        
HadoopFormats.DEFAULT_PARTITIONER_CLASS_ATTR.getName()))
+                .withLabel("Partitioner Class"));
+      }
+    }
+
+    @Override
+    public PDone expand(PCollection<KV<KeyT, ValueT>> input) {
+
+      // streamed pipeline must have defined configuration transformation
+      if (input.isBounded().equals(PCollection.IsBounded.UNBOUNDED)
+          || 
!input.getWindowingStrategy().equals(WindowingStrategy.globalDefault())) {
+        checkArgument(
+            configTransform != null,
+            "Writing of unbounded data can be processed only with 
configuration transformation provider. See %s.withConfigurationTransform()",
+            Write.class);
+      }
+
+      verifyInputWindowing(input);
+
+      TypeDescriptor<Configuration> configType = new 
TypeDescriptor<Configuration>() {};
+      input
+          .getPipeline()
+          .getCoderRegistry()
+          .registerCoderForType(configType, new ConfigurationCoder());
+
+      PCollectionView<Configuration> configView = 
createConfigurationView(input);
+
+      return processJob(input, configView);
+    }
+
+    /**
+     * Processes write job. Write job is composed from following partial steps:
+     *
+     * <ul>
+     *   <li>When partitioning is enabled:
+     *       <ul>
+     *         <li>Assigning of the {@link TaskID} (represented as {@link 
Integer}) to the {@link
+     *             KV}s in {@link AssignTaskFn}
+     *         <li>Grouping {@link KV}s by the {@link TaskID}
+     *       </ul>
+     *   <li>Otherwise creation of TaskId via {@link 
PrepareNonPartitionedTasksFn} where locks are
+     *       created for each task id
+     *   <li>Writing of {@link KV} records via {@link WriteFn}
+     *   <li>Committing of whole job via {@link CommitJobFn}
+     * </ul>
+     *
+     * @param input Collection with output data to write
+     * @param configView configuration view
+     * @return Successfully processed write
+     */
+    private PDone processJob(
+        PCollection<KV<KeyT, ValueT>> input, PCollectionView<Configuration> 
configView) {
+
+      TypeDescriptor<Iterable<Integer>> iterableIntType =
+          TypeDescriptors.iterables(TypeDescriptors.integers());
+
+      PCollection<KV<KeyT, ValueT>> validatedInput =
+          input.apply(
+              ParDo.of(
+                      new SetupJobFn<>(
+                          externalSynchronization, configView, 
input.getTypeDescriptor()))
+                  .withSideInputs(configView));
+
+      PCollection<KV<Integer, KV<KeyT, ValueT>>> writeData =
+          withPartitioning
+              ? validatedInput.apply("GroupDataByPartition", new 
GroupDataByPartition<>(configView))
+              : validatedInput.apply(
+                  "PrepareNonPartitionedTasks",
+                  ParDo.of(
+                          new PrepareNonPartitionedTasksFn<KeyT, ValueT>(
+                              configView, externalSynchronization))
+                      .withSideInputs(configView));
+
+      PCollection<Iterable<Integer>> collectedFinishedWrites =
+          writeData
+              .apply(
+                  "Write",
+                  ParDo.of(new WriteFn<KeyT, ValueT>(configView, 
externalSynchronization))
+                      .withSideInputs(configView))
+              .setTypeDescriptor(TypeDescriptors.integers())
+              .apply(
+                  "CollectWriteTasks",
+                  Combine.globally(new 
IterableCombinerFn<>(TypeDescriptors.integers()))
+                      .withoutDefaults())
+              .setTypeDescriptor(iterableIntType);
+
+      return PDone.in(
+          collectedFinishedWrites
+              .apply(
+                  "CommitWriteJob",
+                  ParDo.of(new CommitJobFn<Integer>(configView, 
externalSynchronization))
+                      .withSideInputs(configView))
+              .getPipeline());
+    }
+
+    private void verifyInputWindowing(PCollection<KV<KeyT, ValueT>> input) {
+      if (input.isBounded().equals(PCollection.IsBounded.UNBOUNDED)) {
+        checkArgument(
+            
!input.getWindowingStrategy().equals(WindowingStrategy.globalDefault()),
+            "Cannot work with %s and GLOBAL %s",
+            PCollection.IsBounded.UNBOUNDED,
+            WindowingStrategy.class.getSimpleName());
+        checkArgument(
+            
input.getWindowingStrategy().getTrigger().getClass().equals(DefaultTrigger.class),
+            "Cannot work with %s trigger. Write works correctly only with %s",
+            
input.getWindowingStrategy().getTrigger().getClass().getSimpleName(),
+            DefaultTrigger.class.getSimpleName());
+        checkArgument(
+            
input.getWindowingStrategy().getAllowedLateness().equals(Duration.ZERO),
+            "Write does not allow late data.");
+      }
+    }
+
+    /**
+     * Creates {@link PCollectionView} with one {@link Configuration} based on 
the set source of the
+     * configuration.
+     *
+     * @param input input data
+     * @return PCollectionView with single {@link Configuration}
+     * @see Builder#withConfiguration(Configuration)
+     * @see Builder#withConfigurationTransform(PTransform)
+     */
+    private PCollectionView<Configuration> createConfigurationView(
+        PCollection<KV<KeyT, ValueT>> input) {
+
+      PCollectionView<Configuration> config;
+      if (configuration != null) {
+        config =
+            input
+                .getPipeline()
+                .apply("CreateOutputConfig", 
Create.<Configuration>of(configuration))
+                
.apply(View.<Configuration>asSingleton().withDefaultValue(configuration));
+      } else {
+        config = input.apply("TransformDataIntoConfig", configTransform);
+      }
+
+      return config;
+    }
+  }
+
+  /**
+   * Represents context of one hadoop write task.
+   *
+   * @param <KeyT> Key type to write
+   * @param <ValueT> Value type to write
+   */
+  private static class TaskContext<KeyT, ValueT> {
+
+    private RecordWriter<KeyT, ValueT> recordWriter;
+    private OutputCommitter outputCommitter;
+    private OutputFormat<KeyT, ValueT> outputFormatObj;
+    private TaskAttemptContext taskAttemptContext;
+
+    TaskContext(TaskAttemptID taskAttempt, Configuration conf) {
+      taskAttemptContext = HadoopFormats.createTaskAttemptContext(conf, 
taskAttempt);
+      outputFormatObj = HadoopFormats.createOutputFormatFromConfig(conf);
+      outputCommitter = initOutputCommitter(outputFormatObj, conf, 
taskAttemptContext);
+      recordWriter = initRecordWriter(outputFormatObj, taskAttemptContext);
+    }
+
+    RecordWriter<KeyT, ValueT> getRecordWriter() {
+      return recordWriter;
+    }
+
+    OutputCommitter getOutputCommitter() {
+      return outputCommitter;
+    }
+
+    TaskAttemptContext getTaskAttemptContext() {
+      return taskAttemptContext;
+    }
+
+    int getTaskId() {
+      return taskAttemptContext.getTaskAttemptID().getTaskID().getId();
+    }
+
+    String getJobId() {
+      return taskAttemptContext.getJobID().getJtIdentifier();
+    }
+
+    void abortTask() {
+      try {
+        outputCommitter.abortTask(taskAttemptContext);
+      } catch (IOException e) {
+        throw new IllegalStateException(
+            String.format("Unable to abort task %s of job %s", getTaskId(), 
getJobId()));
+      }
+    }
+
+    private RecordWriter<KeyT, ValueT> initRecordWriter(
+        OutputFormat<KeyT, ValueT> outputFormatObj, TaskAttemptContext 
taskAttemptContext)
+        throws IllegalStateException {
+      try {
+        LOGGER.info(
+            "Creating new RecordWriter for task {} of Job with id {}.",
+            taskAttemptContext.getTaskAttemptID().getTaskID().getId(),
+            taskAttemptContext.getJobID().getJtIdentifier());
+        return outputFormatObj.getRecordWriter(taskAttemptContext);
+      } catch (InterruptedException | IOException e) {
+        throw new IllegalStateException("Unable to create RecordWriter object: 
", e);
+      }
+    }
+
+    private static OutputCommitter initOutputCommitter(
+        OutputFormat<?, ?> outputFormatObj,
+        Configuration conf,
+        TaskAttemptContext taskAttemptContext)
+        throws IllegalStateException {
+      OutputCommitter outputCommitter;
+      try {
+        outputCommitter = 
outputFormatObj.getOutputCommitter(taskAttemptContext);
+        if (outputCommitter != null) {
+          outputCommitter.setupJob(new JobContextImpl(conf, 
taskAttemptContext.getJobID()));
+        }
+      } catch (Exception e) {
+        throw new IllegalStateException("Unable to create OutputCommitter 
object: ", e);
+      }
+
+      return outputCommitter;
+    }
+
+    @Override
+    public String toString() {
+      return "TaskContext{"
+          + "jobId="
+          + getJobId()
+          + ", taskId="
+          + getTaskId()
+          + ", attemptId="
+          + taskAttemptContext.getTaskAttemptID().getId()
+          + '}';
+    }
+  }
+
+  /** Coder of configuration instances. */
+  private static class ConfigurationCoder extends AtomicCoder<Configuration> {
+
+    @Override
+    public void encode(Configuration value, OutputStream outStream) throws 
IOException {
+      DataOutputStream dataOutputStream = new DataOutputStream(outStream);
+      value.write(dataOutputStream);
+      dataOutputStream.flush();
+    }
+
+    @Override
+    public Configuration decode(InputStream inStream) throws IOException {
+      DataInputStream dataInputStream = new DataInputStream(inStream);
+      Configuration config = new Configuration(false);
+      config.readFields(dataInputStream);
+
+      return config;
+    }
+  }
+
+  /**
+   * DoFn with following responsibilities:
+   *
+   * <ul>
+   *   <li>Validates configuration - checks whether all required properties 
are set.
+   *   <li>Validates types of input PCollection elements.
+   *   <li>Setups start of the {@link OutputFormat} job for given window.
+   * </ul>
+   *
+   * <p>Logic of the setup job is called only for the very first element of 
the instance. All other
+   * elements are directly sent to next processing.
+   */
+  private static class SetupJobFn<KeyT, ValueT> extends DoFn<KV<KeyT, ValueT>, 
KV<KeyT, ValueT>> {
+
+    private ExternalSynchronization externalSynchronization;
+    private PCollectionView<Configuration> configView;
+    private TypeDescriptor<KV<KeyT, ValueT>> inputTypeDescriptor;
+    private boolean isSetupJobAttempted;
+
+    SetupJobFn(
+        ExternalSynchronization externalSynchronization,
+        PCollectionView<Configuration> configView,
+        TypeDescriptor<KV<KeyT, ValueT>> inputTypeDescriptor) {
+      this.externalSynchronization = externalSynchronization;
+      this.configView = configView;
+      this.inputTypeDescriptor = inputTypeDescriptor;
+    }
+
+    @Setup
+    public void setup() {
+      isSetupJobAttempted = false;
+    }
+
+    @DoFn.ProcessElement
+    public void processElement(
+        @DoFn.Element KV<KeyT, ValueT> element,
+        OutputReceiver<KV<KeyT, ValueT>> receiver,
+        BoundedWindow window,
+        ProcessContext c) {
+
+      receiver.output(element);
+
+      if (isSetupJobAttempted) {
+        // setup of job was already attempted
+        return;
+      }
+
+      Configuration conf = c.sideInput(configView);
+
+      // validate configuration and input
+      // must be done first, because in all later operations are required 
assumptions from
+      // validation
+      validateConfiguration(conf);
+      validateInputData(conf);
+
+      boolean isJobLockAcquired = 
externalSynchronization.tryAcquireJobLock(conf);
+      isSetupJobAttempted = true;
+
+      if (!isJobLockAcquired) {
+        // some parallel execution acquired task
+        return;
+      }
+
+      try {
+        // setup job
+        JobID jobId = HadoopFormats.getJobId(conf);
+
+        trySetupJob(jobId, conf, window);
+
+      } catch (Exception e) {
+        throw new IllegalStateException(e);
+      }
+    }
+
+    /**
+     * Validates that the mandatory configuration properties such as 
OutputFormat class,
+     * OutputFormat key and value classes are provided in the Hadoop 
configuration.
+     */
+    private void validateConfiguration(Configuration conf) {
+
+      checkArgument(conf != null, "Configuration can not be null");
+      checkArgument(
+          conf.get(OUTPUT_FORMAT_CLASS_ATTR) != null,
+          "Configuration must contain \"" + OUTPUT_FORMAT_CLASS_ATTR + "\"");
+      checkArgument(
+          conf.get(OUTPUT_KEY_CLASS) != null,
+          "Configuration must contain \"" + OUTPUT_KEY_CLASS + "\"");
+      checkArgument(
+          conf.get(OUTPUT_VALUE_CLASS) != null,
+          "Configuration must contain \"" + OUTPUT_VALUE_CLASS + "\"");
+      checkArgument(conf.get(JOB_ID) != null, "Configuration must contain \"" 
+ JOB_ID + "\"");
+    }
+
+    /**
+     * Validates input data whether have correctly specified {@link 
TypeDescriptor}s of input data
+     * and if the {@link TypeDescriptor}s match with output types set in the 
hadoop {@link
+     * Configuration}.
+     *
+     * @param conf hadoop config
+     */
+    @SuppressWarnings("unchecked")
+    private void validateInputData(Configuration conf) {
+      TypeDescriptor<KeyT> outputFormatKeyClass =
+          (TypeDescriptor<KeyT>) 
TypeDescriptor.of(conf.getClass(OUTPUT_KEY_CLASS, null));
+      TypeDescriptor<ValueT> outputFormatValueClass =
+          (TypeDescriptor<ValueT>) 
TypeDescriptor.of(conf.getClass(OUTPUT_VALUE_CLASS, null));
+
+      checkArgument(
+          inputTypeDescriptor != null,
+          "Input %s must be set!",
+          TypeDescriptor.class.getSimpleName());
+      checkArgument(
+          KV.class.equals(inputTypeDescriptor.getRawType()),
+          "%s expects %s as input type.",
+          Write.class.getSimpleName(),
+          KV.class);
+      checkArgument(
+          inputTypeDescriptor.equals(
+              TypeDescriptors.kvs(outputFormatKeyClass, 
outputFormatValueClass)),
+          "%s expects following %ss: KV(Key: %s, Value: %s) but following %ss 
are set: KV(Key: %s, Value: %s)",
+          Write.class.getSimpleName(),
+          TypeDescriptor.class.getSimpleName(),
+          outputFormatKeyClass.getRawType(),
+          outputFormatValueClass.getRawType(),
+          TypeDescriptor.class.getSimpleName(),
+          inputTypeDescriptor.resolveType(KV.class.getTypeParameters()[0]),
+          inputTypeDescriptor.resolveType(KV.class.getTypeParameters()[1]));
+    }
+
+    /**
+     * Setups the hadoop write job as running. There is possibility that some 
parallel worker
+     * already did setup. in this case {@link FileAlreadyExistsException} is 
catch.
+     *
+     * @param jobId jobId
+     * @param conf hadoop configuration
+     * @param window window
+     */
+    private void trySetupJob(JobID jobId, Configuration conf, BoundedWindow 
window) {
+      try {
+        TaskAttemptContext setupTaskContext = 
HadoopFormats.createSetupTaskContext(conf, jobId);
+        OutputFormat<?, ?> jobOutputFormat = 
HadoopFormats.createOutputFormatFromConfig(conf);
+
+        jobOutputFormat.checkOutputSpecs(setupTaskContext);
+        
jobOutputFormat.getOutputCommitter(setupTaskContext).setupJob(setupTaskContext);
+
+        LOGGER.info(
+            "Job with id {} successfully configured from window with max 
timestamp {}.",
+            jobId.getJtIdentifier(),
+            window.maxTimestamp());
+
+      } catch (FileAlreadyExistsException e) {
+        LOGGER.info("Job was already set by other worker. Skipping rest of the 
setup.");
+      } catch (Exception e) {
+        throw new RuntimeException("Unable to setup job.", e);
+      }
+    }
+  }
+
+  /**
+   * Commits whole write job.
+   *
+   * @param <T> type of TaskId identifier
+   */
+  private static class CommitJobFn<T> extends DoFn<Iterable<T>, Void> {
+
+    private PCollectionView<Configuration> configView;
+    private ExternalSynchronization externalSynchronization;
+
+    CommitJobFn(
+        PCollectionView<Configuration> configView,
+        ExternalSynchronization externalSynchronization) {
+      this.configView = configView;
+      this.externalSynchronization = externalSynchronization;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+
+      Configuration config = c.sideInput(configView);
+      cleanupJob(config);
+    }
+
+    /**
+     * Commits whole write job.
+     *
+     * @param config hadoop config
+     */
+    private void cleanupJob(Configuration config) {
+
+      externalSynchronization.releaseJobIdLock(config);
+
+      JobID jobID = HadoopFormats.getJobId(config);
+      TaskAttemptContext cleanupTaskContext = 
HadoopFormats.createCleanupTaskContext(config, jobID);
+      OutputFormat<?, ?> outputFormat = 
HadoopFormats.createOutputFormatFromConfig(config);
+      try {
+        OutputCommitter outputCommitter = 
outputFormat.getOutputCommitter(cleanupTaskContext);
+        outputCommitter.commitJob(cleanupTaskContext);
+      } catch (Exception e) {
+        throw new RuntimeException("Unable to commit job.", e);
+      }
+    }
+  }
+
+  /**
+   * Assigns {@link TaskID#getId()} to the given pair of key and value. {@link 
TaskID} is later used
+   * for writing the pair to hadoop file.
+   *
+   * @param <KeyT> Type of key
+   * @param <ValueT> Type of value
+   */
+  private static class AssignTaskFn<KeyT, ValueT>
+      extends DoFn<KV<KeyT, ValueT>, KV<Integer, KV<KeyT, ValueT>>> {
+
+    private PCollectionView<Configuration> configView;
+
+    // Transient properties because they are used only for one bundle
+    /** Cache of created TaskIDs for given bundle. */
+    private transient Map<Integer, TaskID> partitionToTaskContext;
+
+    private transient Partitioner<KeyT, ValueT> partitioner;
+    private transient Integer reducersCount;
+    private transient JobID jobId;
+
+    /**
+     * Needs configuration view of given window.
+     *
+     * @param configView configuration view
+     */
+    AssignTaskFn(PCollectionView<Configuration> configView) {
+      this.configView = configView;
+    }
+
+    /** Deletes cached fields used in previous bundle. */
+    @StartBundle
+    public void startBundle() {
+      partitionToTaskContext = new HashMap<>();
+      partitioner = null;
+      jobId = null;
+      reducersCount = null;
+    }
+
+    @ProcessElement
+    public void processElement(
+        @Element KV<KeyT, ValueT> element,
+        OutputReceiver<KV<Integer, KV<KeyT, ValueT>>> receiver,
+        ProcessContext c) {
+
+      Configuration config = c.sideInput(configView);
+
+      TaskID taskID = createTaskIDForKV(element, config);
+      int taskId = taskID.getId();
+      receiver.output(KV.of(taskId, element));
+    }
+
+    /**
+     * Creates or reuses existing {@link TaskID} for given record.
+     *
+     * <p>The {@link TaskID} creation is based on the calculation hash 
function of {@code KeyT} of
+     * the pair via {@link Partitioner} (stored in configuration)
+     *
+     * @param kv keyvalue pair which should be written
+     * @param config hadoop configuration
+     * @return TaskID assigned to given record
+     */
+    private TaskID createTaskIDForKV(KV<KeyT, ValueT> kv, Configuration 
config) {
+      int taskContextKey =
+          getPartitioner(config).getPartition(kv.getKey(), kv.getValue(), 
getReducersCount(config));
+
+      return partitionToTaskContext.computeIfAbsent(
+          taskContextKey, (key) -> 
HadoopFormats.createTaskID(getJobId(config), key));
+    }
+
+    private JobID getJobId(Configuration config) {
+      if (jobId == null) {
+        jobId = HadoopFormats.getJobId(config);
+      }
+      return jobId;
+    }
+
+    private int getReducersCount(Configuration config) {
+      if (reducersCount == null) {
+        reducersCount = HadoopFormats.getReducersCount(config);
+      }
+      return reducersCount;
+    }
+
+    private Partitioner<KeyT, ValueT> getPartitioner(Configuration config) {
+      if (partitioner == null) {
+        partitioner = HadoopFormats.getPartitioner(config);
+      }
+      return partitioner;
+    }
+  }
+
+  /**
+   * Writes all {@link KV}s pair for given {@link TaskID} (Task Id determines 
partition of writing).
+   *
+   * <p>For every {@link TaskID} are executed following steps:
+   *
+   * <ul>
+   *   <li>Creation of {@link TaskContext} on start of bundle
+   *   <li>Writing every single {@link KV} pair via {@link RecordWriter}.
+   *   <li>Committing of task on bundle finish
+   * </ul>
+   *
+   * @param <KeyT> Type of key
+   * @param <ValueT> Type of value
+   */
+  private static class WriteFn<KeyT, ValueT> extends DoFn<KV<Integer, KV<KeyT, 
ValueT>>, Integer> {
+
+    private final PCollectionView<Configuration> configView;
+    private final ExternalSynchronization externalSynchronization;
+
+    // Transient property because they are used only for one bundle
+    /** Key by combination of Window and TaskId because different windows can 
use same TaskId. */
+    private transient Map<KV<BoundedWindow, Integer>, TaskContext<KeyT, 
ValueT>>
+        bundleTaskContextMap;
+
+    WriteFn(
+        PCollectionView<Configuration> configView,
+        ExternalSynchronization externalSynchronization) {
+      this.configView = configView;
+      this.externalSynchronization = externalSynchronization;
+    }
+
+    /** Deletes cached map from previous bundle. */
+    @StartBundle
+    public void startBundle() {
+      bundleTaskContextMap = new HashMap<>();
+    }
+
+    @ProcessElement
+    public void processElement(
+        @Element KV<Integer, KV<KeyT, ValueT>> element, ProcessContext c, 
BoundedWindow b) {
+
+      Integer taskID = element.getKey();
+      KV<BoundedWindow, Integer> win2TaskId = KV.of(b, taskID);
+
+      TaskContext<KeyT, ValueT> taskContext =
+          bundleTaskContextMap.computeIfAbsent(win2TaskId, w2t -> 
setupTask(w2t.getValue(), c));
+
+      write(element.getValue(), taskContext);
+    }
+
+    @FinishBundle
+    public void finishBundle(FinishBundleContext c) {
+      if (bundleTaskContextMap == null) {
+        return;
+      }
+
+      for (Map.Entry<KV<BoundedWindow, Integer>, TaskContext<KeyT, ValueT>> 
entry :
+          bundleTaskContextMap.entrySet()) {
+        TaskContext<KeyT, ValueT> taskContext = entry.getValue();
+
+        try {
+          
taskContext.getRecordWriter().close(taskContext.getTaskAttemptContext());
+          
taskContext.getOutputCommitter().commitTask(taskContext.getTaskAttemptContext());
+
+          LOGGER.info("Write task for {} was successfully committed!", 
taskContext);
+        } catch (Exception e) {
+          processTaskException(taskContext, e);
+        }
+
+        BoundedWindow window = entry.getKey().getKey();
+        c.output(taskContext.getTaskId(), 
Objects.requireNonNull(window).maxTimestamp(), window);
+      }
+    }
+
+    private void processTaskException(TaskContext<KeyT, ValueT> taskContext, 
Exception e) {
+      LOGGER.warn("Write task for {} failed. Will abort task.", taskContext);
+      taskContext.abortTask();
+      throw new IllegalArgumentException(e);
+    }
+
+    /**
+     * Writes one {@link KV} pair for given {@link TaskID}.
+     *
+     * @param kv Iterable of pairs to write
+     * @param taskContext taskContext
+     */
+    private void write(KV<KeyT, ValueT> kv, TaskContext<KeyT, ValueT> 
taskContext) {
+
+      try {
+        RecordWriter<KeyT, ValueT> recordWriter = 
taskContext.getRecordWriter();
+        recordWriter.write(kv.getKey(), kv.getValue());
+      } catch (Exception e) {
+        processTaskException(taskContext, e);
+      }
+    }
+
+    /**
+     * Creates {@link TaskContext} and setups write for given {@code taskId}.
+     *
+     * @param taskId id of the write Task
+     * @param c process context
+     * @return created TaskContext
+     * @throws IllegalStateException if the setup of the write task failed
+     */
+    private TaskContext<KeyT, ValueT> setupTask(Integer taskId, ProcessContext 
c)
+        throws IllegalStateException {
+
+      final Configuration conf = c.sideInput(configView);
+      TaskAttemptID taskAttemptID = 
externalSynchronization.acquireTaskAttemptIdLock(conf, taskId);
+
+      TaskContext<KeyT, ValueT> taskContext = new TaskContext<>(taskAttemptID, 
conf);
+
+      try {
+        
taskContext.getOutputCommitter().setupTask(taskContext.getTaskAttemptContext());
+      } catch (Exception e) {
+        processTaskException(taskContext, e);
+      }
+
+      LOGGER.info(
+          "Task with id {} of job {} was successfully setup!",
+          taskId,
+          HadoopFormats.getJobId(conf).getJtIdentifier());
+
+      return taskContext;
+    }
+  }
+
+  /**
+   * Registers task ID for each bundle without need to group data by taskId. 
Every bundle reserves
+   * its own taskId via particular implementation of {@link 
ExternalSynchronization} class.
+   *
+   * @param <KeyT> Type of keys to be written.
+   * @param <ValueT> Type of values to be written.
+   */
+  private static class PrepareNonPartitionedTasksFn<KeyT, ValueT>
+      extends DoFn<KV<KeyT, ValueT>, KV<Integer, KV<KeyT, ValueT>>> {
+
+    private transient TaskID taskId;
+
+    private final PCollectionView<Configuration> configView;
+    private final ExternalSynchronization externalSynchronization;
+
+    private PrepareNonPartitionedTasksFn(
+        PCollectionView<Configuration> configView,
+        ExternalSynchronization externalSynchronization) {
+      this.configView = configView;
+      this.externalSynchronization = externalSynchronization;
+    }
+
+    @DoFn.StartBundle
+    public void startBundle() {
+      taskId = null;
+    }
+
+    @DoFn.ProcessElement
+    public void processElement(
+        @DoFn.Element KV<KeyT, ValueT> element,
+        OutputReceiver<KV<Integer, KV<KeyT, ValueT>>> output,
+        ProcessContext c) {
+
+      if (taskId == null) {
+        Configuration conf = c.sideInput(configView);
+        taskId = externalSynchronization.acquireTaskIdLock(conf);
+      }
+
+      output.output(KV.of(taskId.getId(), element));
+    }
+  }
+}
diff --git 
a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormats.java
 
b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormats.java
new file mode 100644
index 000000000000..217831b2f83e
--- /dev/null
+++ 
b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormats.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hadoop.format;
+
+import com.google.common.base.Preconditions;
+import java.lang.reflect.InvocationTargetException;
+import java.util.UUID;
+import javax.annotation.Nullable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+/** Utility class for working with Hadoop related objects. */
+final class HadoopFormats {
+
+  private static final int DEFAULT_JOB_NUMBER = 0;
+  static final Class<HashPartitioner> DEFAULT_PARTITIONER_CLASS_ATTR = 
HashPartitioner.class;
+  static final int DEFAULT_NUM_REDUCERS = 1;
+
+  private HadoopFormats() {}
+
+  /**
+   * Creates {@link JobID} with random jtIdentifier and default job number.
+   *
+   * @return new {@link JobID}
+   */
+  public static JobID createJobId() {
+    return new JobID(UUID.randomUUID().toString(), DEFAULT_JOB_NUMBER);
+  }
+
+  /**
+   * Creates {@link JobID} with specified jtIdentifier and default job number.
+   *
+   * @param jtIdentifier jtIdentifier to specify
+   * @return new {@link JobID}
+   */
+  public static JobID createJobId(String jtIdentifier) {
+    return new JobID(jtIdentifier, DEFAULT_JOB_NUMBER);
+  }
+
+  /**
+   * Creates new setup {@link TaskAttemptContext} from hadoop {@link 
Configuration} and {@link
+   * JobID}.
+   *
+   * @param conf hadoop {@link Configuration}
+   * @param jobID jobId of the created {@link TaskAttemptContext}
+   * @return new setup {@link TaskAttemptContext}
+   */
+  static TaskAttemptContext createSetupTaskContext(Configuration conf, JobID 
jobID) {
+    final TaskID taskId = new TaskID(jobID, TaskType.JOB_SETUP, 0);
+    return createTaskAttemptContext(conf, new TaskAttemptID(taskId, 0));
+  }
+
+  /**
+   * Creates new {@link TaskAttemptContext} from hadoop {@link Configuration}, 
{@link JobID} and
+   * specified taskNumber.
+   *
+   * @param conf hadoop {@link Configuration}
+   * @param jobID jobId of the created {@link TaskAttemptContext}
+   * @param taskNumber number of the task (should be unique across one job)
+   * @return new {@link TaskAttemptContext}
+   */
+  static TaskAttemptContext createTaskAttemptContext(
+      Configuration conf, JobID jobID, int taskNumber) {
+    TaskAttemptID taskAttemptID = createTaskAttemptID(jobID, taskNumber, 0);
+    return createTaskAttemptContext(conf, taskAttemptID);
+  }
+
+  /**
+   * Creates {@link TaskAttemptContext}.
+   *
+   * @param conf cofniguration
+   * @param taskAttemptID taskAttemptId
+   * @return new {@link TaskAttemptContext}
+   */
+  static TaskAttemptContext createTaskAttemptContext(
+      Configuration conf, TaskAttemptID taskAttemptID) {
+    return new TaskAttemptContextImpl(conf, taskAttemptID);
+  }
+
+  /**
+   * Creates new {@link TaskAttemptID}.
+   *
+   * @param jobID jobId
+   * @param taskId taskId
+   * @param attemptId attemptId
+   * @return new {@link TaskAttemptID}
+   */
+  static TaskAttemptID createTaskAttemptID(JobID jobID, int taskId, int 
attemptId) {
+    final TaskID tId = createTaskID(jobID, taskId);
+    return new TaskAttemptID(tId, attemptId);
+  }
+
+  /**
+   * Creates new {@link TaskID} with specified {@code taskNumber} for given 
{@link JobID}.
+   *
+   * @param jobID jobId of the created {@link TaskID}
+   * @param taskNumber number of the task (should be unique across one job)
+   * @return new {@link TaskID} for given {@link JobID}
+   */
+  static TaskID createTaskID(JobID jobID, int taskNumber) {
+    return new TaskID(jobID, TaskType.REDUCE, taskNumber);
+  }
+
+  /**
+   * Creates cleanup {@link TaskAttemptContext} for given {@link JobID}.
+   *
+   * @param conf hadoop configuration
+   * @param jobID jobId of the created {@link TaskID}
+   * @return new cleanup {@link TaskID} for given {@link JobID}
+   */
+  static TaskAttemptContext createCleanupTaskContext(Configuration conf, JobID 
jobID) {
+    final TaskID taskId = new TaskID(jobID, TaskType.JOB_CLEANUP, 0);
+    return createTaskAttemptContext(conf, new TaskAttemptID(taskId, 0));
+  }
+
+  /**
+   * Returns instance of {@link OutputFormat} by class name stored in the 
configuration under key
+   * {@link MRJobConfig#OUTPUT_FORMAT_CLASS_ATTR}.
+   *
+   * @param conf Hadoop configuration
+   * @param <KeyT> KeyType of output format
+   * @param <ValueT> ValueType of output format
+   * @return OutputFormatter
+   * @throws IllegalArgumentException if particular key was not found in the 
config or Formatter was
+   *     unable to construct.
+   */
+  @SuppressWarnings("unchecked")
+  static <KeyT, ValueT> OutputFormat<KeyT, ValueT> 
createOutputFormatFromConfig(Configuration conf)
+      throws IllegalArgumentException {
+    return (OutputFormat<KeyT, ValueT>)
+        createInstanceFromConfig(
+            conf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, null, 
OutputFormat.class);
+  }
+
+  /**
+   * Creates new instance of {@link Partitioner} by class specified in hadoop 
{@link Configuration}.
+   *
+   * @param conf hadoop Configuration
+   * @param <KeyT> KeyType of {@link Partitioner}
+   * @param <ValueT> ValueTYpe of {@link Partitioner}
+   * @return new {@link Partitioner}
+   */
+  @SuppressWarnings("unchecked")
+  static <KeyT, ValueT> Partitioner<KeyT, ValueT> getPartitioner(Configuration 
conf) {
+    return (Partitioner<KeyT, ValueT>)
+        createInstanceFromConfig(
+            conf,
+            MRJobConfig.PARTITIONER_CLASS_ATTR,
+            DEFAULT_PARTITIONER_CLASS_ATTR,
+            Partitioner.class);
+  }
+
+  /**
+   * Creates object from class specified in the configuration under specified 
{@code
+   * configClassKey}.
+   *
+   * @param conf hadoop Configuration where is stored class name of returned 
object
+   * @param configClassKey key for class name
+   * @param defaultClass Default class if any result was not found under 
specified {@code
+   *     configClassKey}
+   * @param xface interface of given class
+   * @return created object
+   */
+  private static <T> T createInstanceFromConfig(
+      Configuration conf,
+      String configClassKey,
+      @Nullable Class<? extends T> defaultClass,
+      Class<T> xface) {
+    try {
+      String className = conf.get(configClassKey);
+      Preconditions.checkArgument(
+          className != null || defaultClass != null,
+          String.format(
+              "Configuration does not contains any value under %s key. Unable 
to initialize class instance from configuration. ",
+              configClassKey));
+
+      Class<? extends T> requiredClass = conf.getClass(configClassKey, 
defaultClass, xface);
+
+      return requiredClass.getConstructor().newInstance();
+    } catch (InstantiationException
+        | IllegalAccessException
+        | NoSuchMethodException
+        | InvocationTargetException e) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Unable to create instance of object from configuration under 
key %s.",
+              configClassKey),
+          e);
+    }
+  }
+
+  /**
+   * Creates {@link JobID} with {@code jtIdentifier} specified in hadoop 
{@link Configuration} under
+   * {@link MRJobConfig#ID} key.
+   *
+   * @param conf hadoop {@link Configuration}
+   * @return JobID created from {@link Configuration}
+   */
+  static JobID getJobId(Configuration conf) {
+    String jobJtIdentifier =
+        Preconditions.checkNotNull(
+            conf.get(MRJobConfig.ID),
+            "Configuration must contain jobID under key \"%s\".",
+            HadoopFormatIO.JOB_ID);
+
+    return new JobID(jobJtIdentifier, DEFAULT_JOB_NUMBER);
+  }
+
+  /**
+   * Returns count of the reducers specified under key {@link 
MRJobConfig#NUM_REDUCES} in hadoop
+   * {@link Configuration}.
+   *
+   * @param conf hadoop {@link Configuration}
+   * @return configured count of reducers
+   */
+  static int getReducersCount(Configuration conf) {
+    return conf.getInt(MRJobConfig.NUM_REDUCES, DEFAULT_NUM_REDUCERS);
+  }
+}
diff --git 
a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/IterableCombinerFn.java
 
b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/IterableCombinerFn.java
new file mode 100644
index 000000000000..2d98bcb49f50
--- /dev/null
+++ 
b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/IterableCombinerFn.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hadoop.format;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+
+/**
+ * Collects all items of defined type into one {@link Iterable} container.
+ *
+ * @param <T> Type of the elements to collect
+ */
+class IterableCombinerFn<T>
+    extends Combine.AccumulatingCombineFn<
+        T, IterableCombinerFn.CollectionAccumulator<T>, Iterable<T>> {
+
+  /**
+   * Accumulator for collecting one "shard" of types.
+   *
+   * @param <T> Type of the elements to collect
+   */
+  public static class CollectionAccumulator<T>
+      implements Combine.AccumulatingCombineFn.Accumulator<
+          T, CollectionAccumulator<T>, Iterable<T>> {
+
+    private final List<T> collection;
+
+    private CollectionAccumulator() {
+      this(new ArrayList<>());
+    }
+
+    private CollectionAccumulator(List<T> collection) {
+      Objects.requireNonNull(collection, "Collection can't be null");
+      this.collection = collection;
+    }
+
+    @Override
+    public void addInput(T input) {
+      collection.add(input);
+    }
+
+    @Override
+    public void mergeAccumulator(CollectionAccumulator<T> other) {
+      collection.addAll(other.collection);
+    }
+
+    @Override
+    public Iterable<T> extractOutput() {
+      return collection;
+    }
+  }
+
+  private final TypeDescriptor<T> typeDescriptor;
+
+  IterableCombinerFn(TypeDescriptor<T> typeDescriptor) {
+    this.typeDescriptor = typeDescriptor;
+  }
+
+  @Override
+  public CollectionAccumulator<T> createAccumulator() {
+    return new CollectionAccumulator<>();
+  }
+
+  @Override
+  public TypeDescriptor<Iterable<T>> getOutputType() {
+    return TypeDescriptors.iterables(typeDescriptor);
+  }
+
+  @Override
+  public Coder<Iterable<T>> getDefaultOutputCoder(CoderRegistry registry, 
Coder<T> inputCoder) {
+    return IterableCoder.of(inputCoder);
+  }
+
+  @Override
+  public Coder<CollectionAccumulator<T>> getAccumulatorCoder(
+      CoderRegistry registry, Coder<T> inputCoder) {
+    return new CollectionAccumulatorCoder<>(inputCoder);
+  }
+
+  /**
+   * Coder for {@link CollectionAccumulator} class.
+   *
+   * @param <T> Type of the {@link CollectionAccumulator} class
+   */
+  private static class CollectionAccumulatorCoder<T> extends 
AtomicCoder<CollectionAccumulator<T>> {
+
+    /** List coder is used to en/decode {@link CollectionAccumulator}. */
+    private final ListCoder<T> listCoder;
+
+    /**
+     * Ctor requires coder for the element type.
+     *
+     * @param typeCoder coder for the element type
+     */
+    private CollectionAccumulatorCoder(Coder<T> typeCoder) {
+      this.listCoder = ListCoder.of(typeCoder);
+    }
+
+    @Override
+    public void encode(IterableCombinerFn.CollectionAccumulator<T> value, 
OutputStream outStream)
+        throws IOException {
+      listCoder.encode(value.collection, outStream);
+    }
+
+    @Override
+    public IterableCombinerFn.CollectionAccumulator<T> decode(InputStream 
inStream)
+        throws IOException {
+
+      List<T> decodedList = listCoder.decode(inStream);
+      return new IterableCombinerFn.CollectionAccumulator<>(decodedList);
+    }
+  }
+}
diff --git 
a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/package-info.java
 
b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/package-info.java
new file mode 100644
index 000000000000..a7e7cdcdceac
--- /dev/null
+++ 
b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Defines transforms for writing to Data sinks that implement {@link
+ * org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO} .
+ *
+ * @see org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO
+ */
+package org.apache.beam.sdk.io.hadoop.format;
diff --git 
a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/EmployeeOutputFormat.java
 
b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/EmployeeOutputFormat.java
new file mode 100644
index 000000000000..ad20e4c49e6c
--- /dev/null
+++ 
b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/EmployeeOutputFormat.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hadoop.format;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.io.hadoop.inputformat.Employee;
+import org.apache.beam.sdk.values.KV;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * This is a valid OutputFormat for writing employee data, available in the 
form of {@code
+ * List<KV>}. {@linkplain EmployeeOutputFormat} is used to test the 
{@linkplain HadoopFormatIO }
+ * sink.
+ */
+public class EmployeeOutputFormat extends OutputFormat<Text, Employee> {
+  private static volatile List<KV<Text, Employee>> output;
+  private static OutputCommitter outputCommitter;
+
+  @Override
+  public RecordWriter<Text, Employee> getRecordWriter(TaskAttemptContext 
context) {
+    return new RecordWriter<Text, Employee>() {
+      @Override
+      public void write(Text key, Employee value) {
+        output.add(KV.of(key, value));
+      }
+
+      @Override
+      public void close(TaskAttemptContext context) {}
+    };
+  }
+
+  @Override
+  public void checkOutputSpecs(JobContext context) {}
+
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
+    return outputCommitter;
+  }
+
+  static synchronized void initWrittenOutput(OutputCommitter outputCommitter) {
+    EmployeeOutputFormat.outputCommitter = outputCommitter;
+    output = Collections.synchronizedList(new ArrayList<>());
+  }
+
+  static List<KV<Text, Employee>> getWrittenOutput() {
+    return output;
+  }
+
+  static OutputCommitter getOutputCommitter() {
+    return outputCommitter;
+  }
+}
diff --git 
a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronizationTest.java
 
b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronizationTest.java
new file mode 100644
index 000000000000..e8b4d4b17f6a
--- /dev/null
+++ 
b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronizationTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hadoop.format;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+/** Tests functionality of {@link HDFSSynchronization} class. */
+public class HDFSSynchronizationTest {
+
+  public static final String DEFAULT_JOB_ID = String.valueOf(1);
+  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  private HDFSSynchronization tested;
+  private Configuration configuration;
+
+  @Before
+  public void setup() {
+    this.tested = new 
HDFSSynchronization(tmpFolder.getRoot().getAbsolutePath());
+    this.configuration = new Configuration();
+    configuration.set(HadoopFormatIO.JOB_ID, DEFAULT_JOB_ID);
+  }
+
+  /** Tests that job lock will be acquired only once until it is again 
released. */
+  @Test
+  public void tryAcquireJobLockTest() {
+    boolean firstAttempt = tested.tryAcquireJobLock(configuration);
+    boolean secondAttempt = tested.tryAcquireJobLock(configuration);
+    boolean thirdAttempt = tested.tryAcquireJobLock(configuration);
+
+    assertTrue(isFileExists(getJobLockPath()));
+
+    tested.releaseJobIdLock(configuration);
+
+    boolean fourthAttempt = tested.tryAcquireJobLock(configuration);
+    boolean fifthAttempt = tested.tryAcquireJobLock(configuration);
+
+    assertTrue(firstAttempt);
+    assertFalse(secondAttempt);
+    assertFalse(thirdAttempt);
+
+    assertTrue(fourthAttempt);
+    assertFalse(fifthAttempt);
+  }
+
+  /** Missing job id in configuration will throw exception. */
+  @Test(expected = NullPointerException.class)
+  public void testMissingJobId() {
+
+    Configuration conf = new Configuration();
+
+    tested.tryAcquireJobLock(conf);
+  }
+
+  /** Multiple attempts to release job will not throw exception. */
+  @Test
+  public void testMultipleTaskDeletion() {
+    String jobFolder = getFileInJobFolder("");
+
+    tested.tryAcquireJobLock(configuration);
+
+    assertTrue(isFileExists(getJobLockPath()));
+
+    tested.releaseJobIdLock(configuration);
+
+    assertFalse(isFileExists(getJobLockPath()));
+    assertFalse(isFolderExists(jobFolder));
+
+    // any exception will not be thrown
+    tested.releaseJobIdLock(configuration);
+  }
+
+  @Test
+  public void testTaskIdLockAcquire() {
+
+    int tasksCount = 100;
+    for (int i = 0; i < tasksCount; i++) {
+      TaskID taskID = tested.acquireTaskIdLock(configuration);
+      assertTrue(isFileExists(getTaskIdPath(taskID)));
+    }
+
+    String jobFolderName = getFileInJobFolder("");
+    File jobFolder = new File(jobFolderName);
+    assertTrue(jobFolder.isDirectory());
+    // we have to multiply by 2 because crc files exists
+    assertEquals(tasksCount * 2, jobFolder.list().length);
+  }
+
+  @Test
+  public void testTaskAttemptIdAcquire() {
+    int tasksCount = 100;
+    int taskId = 25;
+
+    for (int i = 0; i < tasksCount; i++) {
+      TaskAttemptID taskAttemptID = 
tested.acquireTaskAttemptIdLock(configuration, taskId);
+      assertTrue(isFileExists(getTaskAttemptIdPath(taskId, 
taskAttemptID.getId())));
+    }
+  }
+
+  @Test
+  public void testCatchingRemoteException() throws IOException {
+
+    FileSystem mockedFileSystem = Mockito.mock(FileSystem.class);
+    RemoteException thrownException =
+        new RemoteException(AlreadyBeingCreatedException.class.getName(), 
"Failed to CREATE_FILE");
+    
Mockito.when(mockedFileSystem.createNewFile(Mockito.any())).thenThrow(thrownException);
+
+    HDFSSynchronization synchronization =
+        new HDFSSynchronization("someDir", (conf) -> mockedFileSystem);
+
+    assertFalse(synchronization.tryAcquireJobLock(configuration));
+  }
+
+  private String getTaskAttemptIdPath(int taskId, int taskAttemptId) {
+    return getFileInJobFolder(taskId + "_" + taskAttemptId);
+  }
+
+  private String getTaskIdPath(TaskID taskID) {
+    return getFileInJobFolder(String.valueOf(taskID.getId()));
+  }
+
+  private String getJobLockPath() {
+    return getFileInJobFolder("_job");
+  }
+
+  private String getFileInJobFolder(String filename) {
+    return tmpFolder.getRoot().getAbsolutePath()
+        + File.separator
+        + DEFAULT_JOB_ID
+        + File.separator
+        + filename;
+  }
+
+  private boolean isFileExists(String path) {
+    File file = new File(path);
+    return file.exists() && !file.isDirectory();
+  }
+
+  private boolean isFolderExists(String path) {
+    File file = new File(path);
+    return file.exists();
+  }
+}
diff --git 
a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT.java
 
b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT.java
new file mode 100644
index 000000000000..7a76f27d904f
--- /dev/null
+++ 
b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hadoop.format;
+
+import static org.apache.beam.sdk.io.common.IOITHelper.executeWithRetry;
+import static 
org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions;
+import static org.apache.beam.sdk.io.common.TestRow.getExpectedHashForRowCount;
+
+import java.sql.SQLException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.common.DatabaseTestHelper;
+import org.apache.beam.sdk.io.common.HashingFn;
+import org.apache.beam.sdk.io.common.PostgresIOTestPipelineOptions;
+import org.apache.beam.sdk.io.common.TestRow;
+import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
+import org.apache.beam.sdk.io.hadoop.inputformat.TestRowDBWritable;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
+import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.postgresql.ds.PGSimpleDataSource;
+
+//sequencefile
+
+/**
+ * A test of {@link org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO} on an 
independent postgres
+ * instance.
+ *
+ * <p>This test requires a running instance of Postgres. Pass in connection 
information using
+ * PipelineOptions:
+ *
+ * <pre>
+ *  ./gradlew integrationTest -p sdks/java/io/hadoop-format/
+ *   -DintegrationTestPipelineOptions='[
+ *     "--postgresServerName=1.2.3.4",
+ *     "--postgresUsername=postgres",
+ *     "--postgresDatabaseName=myfancydb",
+ *     "--postgresPassword=mypass",
+ *     "--postgresSsl=false",
+ *     "--numberOfRecords=1000" ]'
+ *  --tests org.apache.beam.sdk.io.hadoop.format.HadoopFormatIOIT
+ *  -DintegrationTestRunner=direct
+ * </pre>
+ *
+ * <p>Please see 'build_rules.gradle' file for instructions regarding running 
this test using Beam
+ * performance testing framework.
+ */
+public class HadoopFormatIOIT {
+
+  private static PGSimpleDataSource dataSource;
+  private static Integer numberOfRows;
+  private static String tableName;
+  private static SerializableConfiguration hadoopConfiguration;
+
+  @Rule public TestPipeline writePipeline = TestPipeline.create();
+  @Rule public TestPipeline readPipeline = TestPipeline.create();
+  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    PostgresIOTestPipelineOptions options =
+        readIOTestPipelineOptions(PostgresIOTestPipelineOptions.class);
+
+    dataSource = DatabaseTestHelper.getPostgresDataSource(options);
+    numberOfRows = options.getNumberOfRecords();
+    tableName = DatabaseTestHelper.getTestTableName("HadoopFormatIOIT");
+
+    executeWithRetry(HadoopFormatIOIT::createTable);
+    setupHadoopConfiguration(options);
+  }
+
+  private static void createTable() throws SQLException {
+    DatabaseTestHelper.createTable(dataSource, tableName);
+  }
+
+  private static void setupHadoopConfiguration(PostgresIOTestPipelineOptions 
options) {
+    Configuration conf = new Configuration();
+    DBConfiguration.configureDB(
+        conf,
+        "org.postgresql.Driver",
+        DatabaseTestHelper.getPostgresDBUrl(options),
+        options.getPostgresUsername(),
+        options.getPostgresPassword());
+    conf.set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, tableName);
+    conf.set(DBConfiguration.OUTPUT_FIELD_COUNT_PROPERTY, "2");
+    conf.setStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, "id", "name");
+
+    conf.setClass(HadoopFormatIO.OUTPUT_KEY_CLASS, TestRowDBWritable.class, 
Object.class);
+    conf.setClass(HadoopFormatIO.OUTPUT_VALUE_CLASS, NullWritable.class, 
Object.class);
+    conf.setClass(
+        HadoopFormatIO.OUTPUT_FORMAT_CLASS_ATTR, DBOutputFormat.class, 
OutputFormat.class);
+    conf.set(HadoopFormatIO.JOB_ID, String.valueOf(1));
+
+    hadoopConfiguration = new SerializableConfiguration(conf);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    executeWithRetry(HadoopFormatIOIT::deleteTable);
+  }
+
+  private static void deleteTable() throws SQLException {
+    DatabaseTestHelper.deleteTable(dataSource, tableName);
+  }
+
+  @Test
+  public void writeUsingHadoopOutputFormat() {
+    writePipeline
+        .apply("Generate sequence", GenerateSequence.from(0).to(numberOfRows))
+        .apply("Produce db rows", ParDo.of(new 
TestRow.DeterministicallyConstructTestRowFn()))
+        .apply("Construct rows for DBOutputFormat", ParDo.of(new 
ConstructDBOutputFormatRowFn()))
+        .apply(
+            "Write using HadoopOutputFormat",
+            HadoopFormatIO.<TestRowDBWritable, NullWritable>write()
+                .withConfiguration(hadoopConfiguration.get())
+                .withPartitioning()
+                .withExternalSynchronization(
+                    new 
HDFSSynchronization(tmpFolder.getRoot().getAbsolutePath())));
+
+    writePipeline.run().waitUntilFinish();
+
+    PCollection<String> consolidatedHashcode =
+        readPipeline
+            .apply(
+                "Read using JDBCIO",
+                JdbcIO.<String>read()
+                    
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
+                    .withQuery(String.format("select name,id from %s;", 
tableName))
+                    .withRowMapper(
+                        (JdbcIO.RowMapper<String>) resultSet -> 
resultSet.getString("name"))
+                    .withCoder(StringUtf8Coder.of()))
+            .apply("Calculate hashcode", Combine.globally(new HashingFn()));
+    
PAssert.thatSingleton(consolidatedHashcode).isEqualTo(getExpectedHashForRowCount(numberOfRows));
+
+    readPipeline.run().waitUntilFinish();
+  }
+
+  /**
+   * Uses the input {@link TestRow} values as seeds to produce new {@link KV}s 
for {@link
+   * HadoopFormatIO}.
+   */
+  public static class ConstructDBOutputFormatRowFn
+      extends DoFn<TestRow, KV<TestRowDBWritable, NullWritable>> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(
+          KV.of(new TestRowDBWritable(c.element().id(), c.element().name()), 
NullWritable.get()));
+    }
+  }
+}
diff --git 
a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOSequenceFileTest.java
 
b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOSequenceFileTest.java
new file mode 100644
index 000000000000..1a19dd937a5d
--- /dev/null
+++ 
b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOSequenceFileTest.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hadoop.format;
+
+import static org.hamcrest.Matchers.equalTo;
+
+import java.io.File;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.beam.examples.WordCount;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.hamcrest.MatcherAssert;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+/** Tests {@link HadoopFormatIO} output with batch and stream pipeline. */
+public class HadoopFormatIOSequenceFileTest {
+
+  private static final Instant START_TIME = new Instant(0);
+  private static final String TEST_FOLDER_NAME = "test";
+  private static final String LOCKS_FOLDER_NAME = "locks";
+  private static final int REDUCERS_COUNT = 2;
+
+  private static final List<String> SENTENCES =
+      Arrays.asList(
+          "Hello world this is first streamed event",
+          "Hello again this is sedcond streamed event",
+          "Third time Hello event created",
+          "And last event will was sent now",
+          "Hello from second window",
+          "First event from second window");
+
+  private static final List<String> FIRST_WIN_WORDS = SENTENCES.subList(0, 4);
+  private static final List<String> SECOND_WIN_WORDS = SENTENCES.subList(4, 6);
+  private static final Duration WINDOW_DURATION = Duration.standardMinutes(1);
+  private static final SerializableFunction<KV<String, Long>, KV<Text, 
LongWritable>>
+      KV_STR_INT_2_TXT_LONGWRITABLE =
+          (KV<String, Long> element) ->
+              KV.of(new Text(element.getKey()), new 
LongWritable(element.getValue()));
+
+  private static Map<String, Long> computeWordCounts(List<String> sentences) {
+    return sentences
+        .stream()
+        .flatMap(s -> Stream.of(s.split("\\W+")))
+        .map(String::toLowerCase)
+        .collect(Collectors.toMap(Function.identity(), s -> 1L, Long::sum));
+  }
+
+  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+
+  @Test
+  @Category(ValidatesRunner.class)
+  public void batchTest() {
+
+    String outputDir = getOutputDirPath("batchTest");
+
+    Configuration conf =
+        createWriteConf(
+            SequenceFileOutputFormat.class,
+            Text.class,
+            LongWritable.class,
+            outputDir,
+            REDUCERS_COUNT,
+            "0");
+
+    executeBatchTest(
+        HadoopFormatIO.<Text, LongWritable>write()
+            .withConfiguration(conf)
+            .withPartitioning()
+            .withExternalSynchronization(new 
HDFSSynchronization(getLocksDirPath())),
+        outputDir);
+
+    Assert.assertEquals(
+        "In lock folder shouldn't be any file", 0, new 
File(getLocksDirPath()).list().length);
+  }
+
+  @Test
+  @Category(ValidatesRunner.class)
+  public void batchTestWithoutPartitioner() {
+    String outputDir = getOutputDirPath("batchTestWithoutPartitioner");
+
+    Configuration conf =
+        createWriteConf(
+            SequenceFileOutputFormat.class,
+            Text.class,
+            LongWritable.class,
+            outputDir,
+            REDUCERS_COUNT,
+            "0");
+
+    executeBatchTest(
+        HadoopFormatIO.<Text, LongWritable>write()
+            .withConfiguration(conf)
+            .withoutPartitioning()
+            .withExternalSynchronization(new 
HDFSSynchronization(getLocksDirPath())),
+        outputDir);
+
+    Assert.assertEquals(
+        "In lock folder shouldn't be any file", 0, new 
File(getLocksDirPath()).list().length);
+  }
+
+  private void executeBatchTest(HadoopFormatIO.Write<Text, LongWritable> 
write, String outputDir) {
+
+    pipeline
+        .apply(Create.of(SENTENCES))
+        .apply(ParDo.of(new ConvertToLowerCaseFn()))
+        .apply(new WordCount.CountWords())
+        .apply(
+            "ConvertToHadoopFormat",
+            ParDo.of(new 
ConvertToHadoopFormatFn<>(KV_STR_INT_2_TXT_LONGWRITABLE)))
+        .setTypeDescriptor(
+            TypeDescriptors.kvs(
+                new TypeDescriptor<Text>() {}, new 
TypeDescriptor<LongWritable>() {}))
+        .apply(write);
+
+    pipeline.run();
+
+    Map<String, Long> results = loadWrittenDataAsMap(outputDir);
+
+    MatcherAssert.assertThat(results.entrySet(), 
equalTo(computeWordCounts(SENTENCES).entrySet()));
+  }
+
+  private List<KV<Text, LongWritable>> loadWrittenData(String outputDir) {
+    return Arrays.stream(Objects.requireNonNull(new File(outputDir).list()))
+        .filter(fileName -> fileName.startsWith("part-r"))
+        .map(fileName -> outputDir + File.separator + fileName)
+        .flatMap(this::extractResultsFromFile)
+        .collect(Collectors.toList());
+  }
+
+  private String getOutputDirPath(String testName) {
+    return Paths.get(tmpFolder.getRoot().getAbsolutePath(), TEST_FOLDER_NAME + 
"/" + testName)
+        .toAbsolutePath()
+        .toString();
+  }
+
+  private String getLocksDirPath() {
+    return Paths.get(tmpFolder.getRoot().getAbsolutePath(), LOCKS_FOLDER_NAME)
+        .toAbsolutePath()
+        .toString();
+  }
+
+  private Stream<KV<Text, LongWritable>> extractResultsFromFile(String 
fileName) {
+    try (SequenceFileRecordReader<Text, LongWritable> reader = new 
SequenceFileRecordReader<>()) {
+      Path path = new Path(fileName);
+      TaskAttemptContext taskContext =
+          HadoopFormats.createTaskAttemptContext(new Configuration(), new 
JobID("readJob", 0), 0);
+      reader.initialize(
+          new FileSplit(path, 0L, Long.MAX_VALUE, new String[] {"localhost"}), 
taskContext);
+      List<KV<Text, LongWritable>> result = new ArrayList<>();
+
+      while (reader.nextKeyValue()) {
+        result.add(
+            KV.of(
+                new Text(reader.getCurrentKey().toString()),
+                new LongWritable(reader.getCurrentValue().get())));
+      }
+
+      return result.stream();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static Configuration createWriteConf(
+      Class<?> outputFormatClass,
+      Class<?> keyClass,
+      Class<?> valueClass,
+      String path,
+      Integer reducersCount,
+      String jobId) {
+
+    return getConfiguration(outputFormatClass, keyClass, valueClass, path, 
reducersCount, jobId);
+  }
+
+  private static Configuration getConfiguration(
+      Class<?> outputFormatClass,
+      Class<?> keyClass,
+      Class<?> valueClass,
+      String path,
+      Integer reducersCount,
+      String jobId) {
+    Configuration conf = new Configuration();
+
+    conf.setClass(HadoopFormatIO.OUTPUT_FORMAT_CLASS_ATTR, outputFormatClass, 
OutputFormat.class);
+    conf.setClass(HadoopFormatIO.OUTPUT_KEY_CLASS, keyClass, Object.class);
+    conf.setClass(HadoopFormatIO.OUTPUT_VALUE_CLASS, valueClass, Object.class);
+    conf.setInt(HadoopFormatIO.NUM_REDUCES, reducersCount);
+    conf.set(HadoopFormatIO.OUTPUT_DIR, path);
+    conf.set(HadoopFormatIO.JOB_ID, jobId);
+    return conf;
+  }
+
+  @Test
+  public void streamTest() {
+
+    TestStream<String> stringsStream =
+        TestStream.create(StringUtf8Coder.of())
+            .advanceWatermarkTo(START_TIME)
+            .addElements(event(FIRST_WIN_WORDS.get(0), 2L))
+            .advanceWatermarkTo(START_TIME.plus(Duration.standardSeconds(27L)))
+            .addElements(
+                event(FIRST_WIN_WORDS.get(1), 25L),
+                event(FIRST_WIN_WORDS.get(2), 18L),
+                event(FIRST_WIN_WORDS.get(3), 28L))
+            .advanceWatermarkTo(START_TIME.plus(Duration.standardSeconds(65L)))
+            .addElements(event(SECOND_WIN_WORDS.get(0), 61L), 
event(SECOND_WIN_WORDS.get(1), 63L))
+            .advanceWatermarkToInfinity();
+
+    String outputDirPath = getOutputDirPath("streamTest");
+
+    PCollection<KV<Text, LongWritable>> dataToWrite =
+        pipeline
+            .apply(stringsStream)
+            .apply(Window.into(FixedWindows.of(WINDOW_DURATION)))
+            .apply(ParDo.of(new ConvertToLowerCaseFn()))
+            .apply(new WordCount.CountWords())
+            .apply(
+                "ConvertToHadoopFormat",
+                ParDo.of(new 
ConvertToHadoopFormatFn<>(KV_STR_INT_2_TXT_LONGWRITABLE)))
+            .setTypeDescriptor(
+                TypeDescriptors.kvs(
+                    new TypeDescriptor<Text>() {}, new 
TypeDescriptor<LongWritable>() {}));
+
+    ConfigTransform<Text, LongWritable> configurationTransformation =
+        new ConfigTransform<>(outputDirPath, Text.class, LongWritable.class);
+
+    dataToWrite.apply(
+        HadoopFormatIO.<Text, LongWritable>write()
+            .withConfigurationTransform(configurationTransformation)
+            .withExternalSynchronization(new 
HDFSSynchronization(getLocksDirPath())));
+
+    pipeline.run();
+
+    Map<String, Long> values = loadWrittenDataAsMap(outputDirPath);
+
+    MatcherAssert.assertThat(
+        values.entrySet(), 
equalTo(computeWordCounts(FIRST_WIN_WORDS).entrySet()));
+
+    Assert.assertEquals(
+        "In lock folder shouldn't be any file", 0, new 
File(getLocksDirPath()).list().length);
+  }
+
+  private Map<String, Long> loadWrittenDataAsMap(String outputDirPath) {
+    return loadWrittenData(outputDirPath)
+        .stream()
+        .collect(
+            Collectors.toMap(
+                kv -> kv.getKey().toString(),
+                kv -> kv.getValue().get(),
+                (first, second) -> first + second));
+  }
+
+  private <T> TimestampedValue<T> event(T eventValue, Long timestamp) {
+
+    return TimestampedValue.of(eventValue, START_TIME.plus(new 
Duration(timestamp)));
+  }
+
+  private static class ConvertToHadoopFormatFn<InputT, OutputT> extends 
DoFn<InputT, OutputT> {
+
+    private SerializableFunction<InputT, OutputT> transformFn;
+
+    ConvertToHadoopFormatFn(SerializableFunction<InputT, OutputT> transformFn) 
{
+      this.transformFn = transformFn;
+    }
+
+    @DoFn.ProcessElement
+    public void processElement(@DoFn.Element InputT element, 
OutputReceiver<OutputT> outReceiver) {
+      outReceiver.output(transformFn.apply(element));
+    }
+  }
+
+  private static class ConvertToLowerCaseFn extends DoFn<String, String> {
+    @DoFn.ProcessElement
+    public void processElement(@DoFn.Element String element, 
OutputReceiver<String> receiver) {
+      receiver.output(element.toLowerCase());
+    }
+
+    @Override
+    public TypeDescriptor<String> getOutputTypeDescriptor() {
+      return super.getOutputTypeDescriptor();
+    }
+  }
+
+  private static class ConfigTransform<KeyT, ValueT>
+      extends PTransform<PCollection<? extends KV<KeyT, ValueT>>, 
PCollectionView<Configuration>> {
+
+    private String outputDirPath;
+    private Class<?> keyClass;
+    private Class<?> valueClass;
+    private int windowNum = 0;
+
+    private ConfigTransform(String outputDirPath, Class<?> keyClass, Class<?> 
valueClass) {
+      this.outputDirPath = outputDirPath;
+      this.keyClass = keyClass;
+      this.valueClass = valueClass;
+    }
+
+    @Override
+    public PCollectionView<Configuration> expand(PCollection<? extends 
KV<KeyT, ValueT>> input) {
+
+      Configuration conf =
+          createWriteConf(
+              SequenceFileOutputFormat.class,
+              keyClass,
+              valueClass,
+              outputDirPath,
+              REDUCERS_COUNT,
+              String.valueOf(windowNum++));
+      return input
+          .getPipeline()
+          .apply(Create.<Configuration>of(conf))
+          .apply(View.<Configuration>asSingleton().withDefaultValue(conf));
+    }
+  }
+}
diff --git 
a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOTest.java
 
b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOTest.java
new file mode 100644
index 000000000000..c435ab755d57
--- /dev/null
+++ 
b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOTest.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hadoop.format;
+
+import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.hadoop.inputformat.Employee;
+import org.apache.beam.sdk.io.hadoop.inputformat.TestEmployeeDataSet;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+/** Unit tests for {@link HadoopFormatIO}. */
+@RunWith(MockitoJUnitRunner.class)
+public class HadoopFormatIOTest {
+
+  private static final int REDUCERS_COUNT = 2;
+  private static final String LOCKS_FOLDER_NAME = "locks";
+  private static Configuration conf;
+
+  @Rule public final transient TestPipeline p = TestPipeline.create();
+  @Rule public ExpectedException thrown = ExpectedException.none();
+  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  @Before
+  public void setUp() {
+    conf = loadTestConfiguration(EmployeeOutputFormat.class, Text.class, 
Employee.class);
+    OutputCommitter mockedOutputCommitter = 
Mockito.mock(OutputCommitter.class);
+    EmployeeOutputFormat.initWrittenOutput(mockedOutputCommitter);
+  }
+
+  private static Configuration loadTestConfiguration(
+      Class<?> outputFormatClassName, Class<?> keyClass, Class<?> valueClass) {
+    Configuration conf = new Configuration();
+    conf.setClass(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, outputFormatClassName, 
OutputFormat.class);
+    conf.setClass(MRJobConfig.OUTPUT_KEY_CLASS, keyClass, Object.class);
+    conf.setClass(MRJobConfig.OUTPUT_VALUE_CLASS, valueClass, Object.class);
+    conf.setInt(MRJobConfig.NUM_REDUCES, REDUCERS_COUNT);
+    conf.set(MRJobConfig.ID, String.valueOf(1));
+    return conf;
+  }
+
+  /**
+   * This test validates {@link HadoopFormatIO.Write Write} transform object 
creation fails with
+   * null configuration. {@link 
HadoopFormatIO.Write.Builder#withConfiguration(Configuration)
+   * withConfiguration(Configuration)} method checks configuration is null and 
throws exception if
+   * it is null.
+   */
+  @Test
+  public void testWriteObjectCreationFailsIfConfigurationIsNull() {
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("Hadoop configuration cannot be null");
+    HadoopFormatIO.<Text, Employee>write()
+        .withConfiguration(null)
+        .withPartitioning()
+        .withExternalSynchronization(new 
HDFSSynchronization(getLocksDirPath()));
+  }
+
+  /**
+   * This test validates functionality of {@link
+   * HadoopFormatIO.Write.Builder#withConfiguration(Configuration) 
withConfiguration(Configuration)}
+   * function when Hadoop OutputFormat class is not provided by the user in 
configuration.
+   */
+  @Test
+  public void testWriteValidationFailsMissingOutputFormatInConf() {
+    Configuration configuration = new Configuration();
+    configuration.setClass(HadoopFormatIO.OUTPUT_KEY_CLASS, Text.class, 
Object.class);
+    configuration.setClass(HadoopFormatIO.OUTPUT_VALUE_CLASS, Employee.class, 
Object.class);
+
+    HadoopFormatIO.Write<Text, Employee> writeWithWrongConfig =
+        HadoopFormatIO.<Text, Employee>write()
+            .withConfiguration(configuration)
+            .withPartitioning()
+            .withExternalSynchronization(new 
HDFSSynchronization(getLocksDirPath()));
+
+    p.apply(Create.of(TestEmployeeDataSet.getEmployeeData()))
+        .setTypeDescriptor(
+            TypeDescriptors.kvs(new TypeDescriptor<Text>() {}, new 
TypeDescriptor<Employee>() {}))
+        .apply("Write", writeWithWrongConfig);
+
+    thrown.expect(Pipeline.PipelineExecutionException.class);
+    thrown.expectMessage("Configuration must contain 
\"mapreduce.job.outputformat.class\"");
+
+    p.run().waitUntilFinish();
+  }
+
+  /**
+   * This test validates functionality of {@link
+   * HadoopFormatIO.Write.Builder#withConfiguration(Configuration) 
withConfiguration(Configuration)}
+   * function when key class is not provided by the user in configuration.
+   */
+  @Test
+  public void testWriteValidationFailsMissingKeyClassInConf() {
+    Configuration configuration = new Configuration();
+    configuration.setClass(
+        HadoopFormatIO.OUTPUT_FORMAT_CLASS_ATTR, TextOutputFormat.class, 
OutputFormat.class);
+    configuration.setClass(HadoopFormatIO.OUTPUT_VALUE_CLASS, Employee.class, 
Object.class);
+
+    runValidationPipeline(configuration);
+
+    thrown.expect(Pipeline.PipelineExecutionException.class);
+    thrown.expectMessage("Configuration must contain 
\"mapreduce.job.output.key.class\"");
+
+    p.run().waitUntilFinish();
+  }
+
+  private void runValidationPipeline(Configuration configuration) {
+    p.apply(Create.of(TestEmployeeDataSet.getEmployeeData()))
+        .setTypeDescriptor(
+            TypeDescriptors.kvs(new TypeDescriptor<Text>() {}, new 
TypeDescriptor<Employee>() {}))
+        .apply(
+            "Write",
+            HadoopFormatIO.<Text, Employee>write()
+                .withConfiguration(configuration)
+                .withPartitioning()
+                .withExternalSynchronization(new 
HDFSSynchronization(getLocksDirPath())));
+  }
+
+  /**
+   * This test validates functionality of {@link
+   * HadoopFormatIO.Write.Builder#withConfiguration(Configuration) 
withConfiguration(Configuration)}
+   * function when value class is not provided by the user in configuration.
+   */
+  @Test
+  public void testWriteValidationFailsMissingValueClassInConf() {
+    Configuration configuration = new Configuration();
+    configuration.setClass(
+        HadoopFormatIO.OUTPUT_FORMAT_CLASS_ATTR, TextOutputFormat.class, 
OutputFormat.class);
+    configuration.setClass(HadoopFormatIO.OUTPUT_KEY_CLASS, Text.class, 
Object.class);
+
+    runValidationPipeline(configuration);
+
+    thrown.expect(Pipeline.PipelineExecutionException.class);
+    thrown.expectMessage("Configuration must contain 
\"mapreduce.job.output.value.class\"");
+
+    p.run().waitUntilFinish();
+  }
+
+  /**
+   * This test validates functionality of {@link
+   * HadoopFormatIO.Write.Builder#withConfiguration(Configuration) 
withConfiguration(Configuration)}
+   * function when job id is not provided by the user in configuration.
+   */
+  @Test
+  public void testWriteValidationFailsMissingJobIDInConf() {
+    Configuration configuration = new Configuration();
+    configuration.setClass(
+        HadoopFormatIO.OUTPUT_FORMAT_CLASS_ATTR, TextOutputFormat.class, 
OutputFormat.class);
+    configuration.setClass(HadoopFormatIO.OUTPUT_KEY_CLASS, Text.class, 
Object.class);
+    configuration.setClass(HadoopFormatIO.OUTPUT_VALUE_CLASS, Employee.class, 
Object.class);
+    configuration.set(HadoopFormatIO.OUTPUT_DIR, 
tmpFolder.getRoot().getAbsolutePath());
+
+    runValidationPipeline(configuration);
+
+    thrown.expect(Pipeline.PipelineExecutionException.class);
+    thrown.expectMessage("Configuration must contain \"mapreduce.job.id\"");
+
+    p.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testWritingData() throws IOException {
+    conf.set(HadoopFormatIO.OUTPUT_DIR, tmpFolder.getRoot().getAbsolutePath());
+    List<KV<Text, Employee>> data = TestEmployeeDataSet.getEmployeeData();
+    PCollection<KV<Text, Employee>> input =
+        p.apply(Create.of(data))
+            .setTypeDescriptor(
+                TypeDescriptors.kvs(
+                    new TypeDescriptor<Text>() {}, new 
TypeDescriptor<Employee>() {}));
+
+    input.apply(
+        "Write",
+        HadoopFormatIO.<Text, Employee>write()
+            .withConfiguration(conf)
+            .withPartitioning()
+            .withExternalSynchronization(new 
HDFSSynchronization(getLocksDirPath())));
+    p.run();
+
+    List<KV<Text, Employee>> writtenOutput = 
EmployeeOutputFormat.getWrittenOutput();
+    assertEquals(data.size(), writtenOutput.size());
+    assertTrue(data.containsAll(writtenOutput));
+    assertTrue(writtenOutput.containsAll(data));
+
+    
Mockito.verify(EmployeeOutputFormat.getOutputCommitter()).commitJob(Mockito.any());
+    Mockito.verify(EmployeeOutputFormat.getOutputCommitter(), 
Mockito.times(REDUCERS_COUNT))
+        .commitTask(Mockito.any());
+  }
+
+  @Test
+  public void testWritingDataFailInvalidKeyType() {
+
+    conf.set(HadoopFormatIO.OUTPUT_DIR, tmpFolder.getRoot().getAbsolutePath());
+    List<KV<String, Employee>> data = new ArrayList<>();
+    data.add(KV.of("key", new Employee("name", "address")));
+    PCollection<KV<String, Employee>> input =
+        p.apply("CreateData", Create.of(data))
+            .setTypeDescriptor(
+                TypeDescriptors.kvs(
+                    new TypeDescriptor<String>() {}, new 
TypeDescriptor<Employee>() {}));
+
+    thrown.expect(Pipeline.PipelineExecutionException.class);
+    thrown.expectMessage(String.class.getName());
+
+    input.apply(
+        "Write",
+        HadoopFormatIO.<String, Employee>write()
+            .withConfiguration(conf)
+            .withPartitioning()
+            .withExternalSynchronization(new 
HDFSSynchronization(getLocksDirPath())));
+    p.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testWritingDataFailInvalidValueType() {
+
+    conf.set(HadoopFormatIO.OUTPUT_DIR, tmpFolder.getRoot().getAbsolutePath());
+    List<KV<Text, Text>> data = new ArrayList<>();
+    data.add(KV.of(new Text("key"), new Text("value")));
+    TypeDescriptor<Text> textTypeDescriptor = new TypeDescriptor<Text>() {};
+    PCollection<KV<Text, Text>> input =
+        p.apply(Create.of(data))
+            .setTypeDescriptor(TypeDescriptors.kvs(textTypeDescriptor, 
textTypeDescriptor));
+
+    thrown.expect(Pipeline.PipelineExecutionException.class);
+    thrown.expectMessage(Text.class.getName());
+
+    input.apply(
+        "Write",
+        HadoopFormatIO.<Text, Text>write()
+            .withConfiguration(conf)
+            .withPartitioning()
+            .withExternalSynchronization(new 
HDFSSynchronization(getLocksDirPath())));
+
+    p.run().waitUntilFinish();
+  }
+
+  /**
+   * This test validates functionality of {@link
+   * HadoopFormatIO.Write#populateDisplayData(DisplayData.Builder)
+   * populateDisplayData(DisplayData.WriteBuilder)}.
+   */
+  @Test
+  public void testWriteDisplayData() {
+    HadoopFormatIO.Write<String, String> write =
+        HadoopFormatIO.<String, String>write()
+            .withConfiguration(conf)
+            .withPartitioning()
+            .withExternalSynchronization(new 
HDFSSynchronization(getLocksDirPath()));
+    DisplayData displayData = DisplayData.from(write);
+
+    assertThat(
+        displayData,
+        hasDisplayItem(
+            HadoopFormatIO.OUTPUT_FORMAT_CLASS_ATTR,
+            conf.get(HadoopFormatIO.OUTPUT_FORMAT_CLASS_ATTR)));
+    assertThat(
+        displayData,
+        hasDisplayItem(HadoopFormatIO.OUTPUT_KEY_CLASS, 
conf.get(HadoopFormatIO.OUTPUT_KEY_CLASS)));
+    assertThat(
+        displayData,
+        hasDisplayItem(
+            HadoopFormatIO.OUTPUT_VALUE_CLASS, 
conf.get(HadoopFormatIO.OUTPUT_VALUE_CLASS)));
+    assertThat(
+        displayData,
+        hasDisplayItem(
+            HadoopFormatIO.PARTITIONER_CLASS_ATTR,
+            HadoopFormats.DEFAULT_PARTITIONER_CLASS_ATTR.getName()));
+  }
+
+  private String getLocksDirPath() {
+    return Paths.get(tmpFolder.getRoot().getAbsolutePath(), LOCKS_FOLDER_NAME)
+        .toAbsolutePath()
+        .toString();
+  }
+}
diff --git 
a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/IterableCombinerTest.java
 
b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/IterableCombinerTest.java
new file mode 100644
index 000000000000..db9f378ea08a
--- /dev/null
+++ 
b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/IterableCombinerTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hadoop.format;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.hamcrest.MatcherAssert;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+
+/** Tests Iterable combiner whether works correctly. */
+public class IterableCombinerTest {
+
+  private static final TypeDescriptor<String> STRING_TYPE_DESCRIPTOR = 
TypeDescriptors.strings();
+
+  private static final List<String> FIRST_ITEMS = Arrays.asList("a", "b", "c");
+  private static final List<String> SECOND_ITEMS = Arrays.asList("c", "d", 
"e");
+
+  @Test
+  public void testCombining() {
+
+    IterableCombinerFn<String> tested = new 
IterableCombinerFn<>(STRING_TYPE_DESCRIPTOR);
+
+    IterableCombinerFn.CollectionAccumulator<String> first = 
tested.createAccumulator();
+    FIRST_ITEMS.forEach(first::addInput);
+
+    IterableCombinerFn.CollectionAccumulator<String> second = 
tested.createAccumulator();
+    SECOND_ITEMS.forEach(second::addInput);
+
+    IterableCombinerFn.CollectionAccumulator<String> merged =
+        tested.mergeAccumulators(Arrays.asList(first, second));
+
+    IterableCombinerFn.CollectionAccumulator<String> compacted = 
tested.compact(merged);
+
+    String[] allItems =
+        Stream.of(FIRST_ITEMS, 
SECOND_ITEMS).flatMap(List::stream).toArray(String[]::new);
+
+    MatcherAssert.assertThat(compacted.extractOutput(), 
Matchers.containsInAnyOrder(allItems));
+  }
+
+  @Test
+  public void testSerializing() throws IOException {
+
+    IterableCombinerFn<String> tested = new 
IterableCombinerFn<>(STRING_TYPE_DESCRIPTOR);
+    IterableCombinerFn.CollectionAccumulator<String> originalAccumulator =
+        tested.createAccumulator();
+
+    FIRST_ITEMS.forEach(originalAccumulator::addInput);
+
+    Coder<IterableCombinerFn.CollectionAccumulator<String>> accumulatorCoder =
+        tested.getAccumulatorCoder(null, StringUtf8Coder.of());
+
+    byte[] bytes;
+
+    try (ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream()) {
+      accumulatorCoder.encode(originalAccumulator, byteArrayOutputStream);
+      byteArrayOutputStream.flush();
+
+      bytes = byteArrayOutputStream.toByteArray();
+    }
+
+    IterableCombinerFn.CollectionAccumulator<String> decodedAccumulator;
+
+    try (ByteArrayInputStream byteArrayInputStream = new 
ByteArrayInputStream(bytes)) {
+      decodedAccumulator = accumulatorCoder.decode(byteArrayInputStream);
+    }
+
+    String[] originalItems = FIRST_ITEMS.toArray(new String[0]);
+
+    MatcherAssert.assertThat(
+        originalAccumulator.extractOutput(), 
Matchers.containsInAnyOrder(originalItems));
+    MatcherAssert.assertThat(
+        decodedAccumulator.extractOutput(), 
Matchers.containsInAnyOrder(originalItems));
+  }
+}
diff --git 
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java
 
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java
index ff36d5dbe51b..20c78c1cff69 100644
--- 
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java
+++ 
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java
@@ -25,7 +25,7 @@
  * Test Utils used in {@link EmployeeInputFormat} and {@link 
ReuseObjectsEmployeeInputFormat} for
  * computing splits.
  */
-class TestEmployeeDataSet {
+public class TestEmployeeDataSet {
   public static final long NUMBER_OF_RECORDS_IN_EACH_SPLIT = 5L;
   public static final long NUMBER_OF_SPLITS = 3L;
   private static final List<KV<String, String>> data = new ArrayList<>();
diff --git 
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestRowDBWritable.java
 
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestRowDBWritable.java
index ea3aeca7a13d..68aaaa8f9ef8 100644
--- 
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestRowDBWritable.java
+++ 
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestRowDBWritable.java
@@ -35,11 +35,18 @@
  * org.apache.hadoop.mapreduce.lib.db.DBInputFormat}.
  */
 @DefaultCoder(AvroCoder.class)
-class TestRowDBWritable extends TestRow implements DBWritable, Writable {
+public class TestRowDBWritable extends TestRow implements DBWritable, Writable 
{
 
   private Integer id;
   private String name;
 
+  public TestRowDBWritable() {}
+
+  public TestRowDBWritable(Integer id, String name) {
+    this.id = id;
+    this.name = name;
+  }
+
   @Override
   public Integer id() {
     return id;
diff --git a/sdks/java/javadoc/build.gradle b/sdks/java/javadoc/build.gradle
index 61565b0a5fe1..e80f2634bdb8 100644
--- a/sdks/java/javadoc/build.gradle
+++ b/sdks/java/javadoc/build.gradle
@@ -59,6 +59,7 @@ def exportedJavadocProjects = [
   ':beam-sdks-java-io-google-cloud-platform',
   ':beam-sdks-java-io-hadoop-common',
   ':beam-sdks-java-io-hadoop-file-system',
+  ':beam-sdks-java-io-hadoop-format',
   ':beam-sdks-java-io-hadoop-input-format',
   ':beam-sdks-java-io-hbase',
   ':beam-sdks-java-io-hcatalog',
diff --git a/settings.gradle b/settings.gradle
index 115cb9ce5b13..76444444d82a 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -44,6 +44,10 @@ include "beam-runners-flink_2.11-job-server"
 project(":beam-runners-flink_2.11-job-server").dir = 
file("runners/flink/job-server")
 include "beam-runners-flink_2.11-job-server-container"
 project(":beam-runners-flink_2.11-job-server-container").dir = 
file("runners/flink/job-server-container")
+include "beam-runners-gcp-gcemd"
+project(":beam-runners-gcp-gcemd").dir = file("runners/gcp/gcemd")
+include "beam-runners-gcp-gcsproxy"
+project(":beam-runners-gcp-gcsproxy").dir = file("runners/gcp/gcsproxy")
 include "beam-runners-gearpump"
 project(":beam-runners-gearpump").dir = file("runners/gearpump")
 include "beam-runners-google-cloud-dataflow-java"
@@ -132,6 +136,8 @@ include "beam-sdks-java-io-hadoop-file-system"
 project(":beam-sdks-java-io-hadoop-file-system").dir = 
file("sdks/java/io/hadoop-file-system")
 include "beam-sdks-java-io-hadoop-input-format"
 project(":beam-sdks-java-io-hadoop-input-format").dir = 
file("sdks/java/io/hadoop-input-format")
+include "beam-sdks-java-io-hadoop-format"
+project(":beam-sdks-java-io-hadoop-format").dir = 
file("sdks/java/io/hadoop-format")
 include "beam-sdks-java-io-hbase"
 project(":beam-sdks-java-io-hbase").dir = file("sdks/java/io/hbase")
 include "beam-sdks-java-io-hcatalog"


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 172642)
    Time Spent: 13h  (was: 12h 50m)

> Add streaming support for HadoopOutputFormatIO
> ----------------------------------------------
>
>                 Key: BEAM-5309
>                 URL: https://issues.apache.org/jira/browse/BEAM-5309
>             Project: Beam
>          Issue Type: Sub-task
>          Components: io-java-hadoop
>            Reporter: Alexey Romanenko
>            Assignee: David Hrbacek
>            Priority: Minor
>          Time Spent: 13h
>  Remaining Estimate: 0h
>
> design doc: https://s.apache.org/beam-streaming-hofio



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to