[flink] convert tabs to 2 spaces

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8852eb15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8852eb15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8852eb15

Branch: refs/heads/master
Commit: 8852eb15deec6a7354f29c894d7d2626748d5704
Parents: 39f08fa
Author: Maximilian Michels <[email protected]>
Authored: Wed Mar 2 23:03:37 2016 +0100
Committer: Davor Bonaci <[email protected]>
Committed: Fri Mar 4 10:04:23 2016 -0800

----------------------------------------------------------------------
 .../FlinkPipelineExecutionEnvironment.java      |  394 +++---
 .../flink/dataflow/FlinkPipelineOptions.java    |  102 +-
 .../flink/dataflow/FlinkPipelineRunner.java     |  308 ++--
 .../flink/dataflow/FlinkRunnerResult.java       |   64 +-
 .../flink/dataflow/examples/TFIDF.java          |  720 +++++-----
 .../flink/dataflow/examples/WordCount.java      |  154 +-
 .../examples/streaming/JoinExamples.java        |  208 +--
 .../KafkaWindowedWordCountExample.java          |  204 +--
 .../examples/streaming/WindowedWordCount.java   |  160 +--
 .../flink/dataflow/io/ConsoleIO.java            |   82 +-
 .../FlinkBatchPipelineTranslator.java           |  230 +--
 .../FlinkBatchTransformTranslators.java         |  892 ++++++------
 .../FlinkBatchTranslationContext.java           |  154 +-
 .../translation/FlinkPipelineTranslator.java    |    6 +-
 .../FlinkStreamingPipelineTranslator.java       |  218 +--
 .../FlinkStreamingTransformTranslators.java     |  684 ++++-----
 .../FlinkStreamingTranslationContext.java       |   88 +-
 .../FlinkCoGroupKeyedListAggregator.java        |   48 +-
 .../functions/FlinkCreateFunction.java          |   50 +-
 .../functions/FlinkDoFnFunction.java            |  300 ++--
 .../FlinkKeyedListAggregationFunction.java      |   74 +-
 .../functions/FlinkMultiOutputDoFnFunction.java |  242 ++--
 .../FlinkMultiOutputPruningFunction.java        |   22 +-
 .../functions/FlinkPartialReduceFunction.java   |   50 +-
 .../functions/FlinkReduceFunction.java          |   32 +-
 .../translation/functions/UnionCoder.java       |  218 +--
 .../translation/types/CoderComparator.java      |  362 ++---
 .../translation/types/CoderTypeInformation.java |  162 +--
 .../translation/types/CoderTypeSerializer.java  |  228 +--
 .../types/InspectableByteArrayOutputStream.java |   12 +-
 .../translation/types/KvCoderComperator.java    |  452 +++---
 .../types/KvCoderTypeInformation.java           |  298 ++--
 .../types/VoidCoderTypeSerializer.java          |  154 +-
 .../wrappers/CombineFnAggregatorWrapper.java    |   94 +-
 .../wrappers/DataInputViewWrapper.java          |   50 +-
 .../wrappers/DataOutputViewWrapper.java         |   32 +-
 .../SerializableFnAggregatorWrapper.java        |   92 +-
 .../translation/wrappers/SinkOutputFormat.java  |  160 +--
 .../translation/wrappers/SourceInputFormat.java |  240 ++--
 .../translation/wrappers/SourceInputSplit.java  |   30 +-
 .../streaming/FlinkAbstractParDoWrapper.java    |  432 +++---
 .../FlinkGroupAlsoByWindowWrapper.java          | 1116 +++++++--------
 .../streaming/FlinkGroupByKeyWrapper.java       |   46 +-
 .../streaming/FlinkParDoBoundMultiWrapper.java  |   70 +-
 .../streaming/FlinkParDoBoundWrapper.java       |  102 +-
 .../io/FlinkStreamingCreateFunction.java        |   52 +-
 .../streaming/io/UnboundedFlinkSource.java      |   76 +-
 .../streaming/io/UnboundedSocketSource.java     |  372 ++---
 .../streaming/io/UnboundedSourceWrapper.java    |  182 +--
 .../state/AbstractFlinkTimerInternals.java      |  178 +--
 .../streaming/state/FlinkStateInternals.java    | 1338 +++++++++---------
 .../streaming/state/StateCheckpointReader.java  |  122 +-
 .../streaming/state/StateCheckpointUtils.java   |  232 +--
 .../streaming/state/StateCheckpointWriter.java  |  198 +--
 .../wrappers/streaming/state/StateType.java     |   68 +-
 .../dataartisans/flink/dataflow/AvroITCase.java |  128 +-
 .../flink/dataflow/FlattenizeITCase.java        |   60 +-
 .../flink/dataflow/FlinkTestPipeline.java       |   74 +-
 .../flink/dataflow/JoinExamplesITCase.java      |  122 +-
 .../flink/dataflow/MaybeEmptyTestITCase.java    |   64 +-
 .../flink/dataflow/ParDoMultiOutputITCase.java  |  128 +-
 .../flink/dataflow/ReadSourceITCase.java        |  244 ++--
 .../dataflow/RemoveDuplicatesEmptyITCase.java   |   48 +-
 .../flink/dataflow/RemoveDuplicatesITCase.java  |   50 +-
 .../flink/dataflow/SideInputITCase.java         |   52 +-
 .../flink/dataflow/TfIdfITCase.java             |   58 +-
 .../flink/dataflow/WordCountITCase.java         |   54 +-
 .../flink/dataflow/WordCountJoin2ITCase.java    |  196 +--
 .../flink/dataflow/WordCountJoin3ITCase.java    |  234 +--
 .../flink/dataflow/WriteSinkITCase.java         |  218 +--
 .../streaming/GroupAlsoByWindowTest.java        |  920 ++++++------
 .../dataflow/streaming/GroupByNullKeyTest.java  |  138 +-
 .../streaming/StateSerializationTest.java       |  506 +++----
 .../streaming/TopWikipediaSessionsITCase.java   |  178 +--
 .../flink/dataflow/util/JoinExamples.java       |  232 +--
 75 files changed, 8179 insertions(+), 8179 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineExecutionEnvironment.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineExecutionEnvironment.java
index 796849d..c2139c6 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineExecutionEnvironment.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineExecutionEnvironment.java
@@ -40,228 +40,228 @@ import java.util.List;
  */
 public class FlinkPipelineExecutionEnvironment {
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class);
 
-       private final FlinkPipelineOptions options;
+  private final FlinkPipelineOptions options;
 
-       /**
-        * The Flink Batch execution environment. This is instantiated to 
either a
-        * {@link org.apache.flink.api.java.CollectionEnvironment},
-        * a {@link org.apache.flink.api.java.LocalEnvironment} or
-        * a {@link org.apache.flink.api.java.RemoteEnvironment}, depending on 
the configuration
-        * options.
-        */
-       private ExecutionEnvironment flinkBatchEnv;
+  /**
+   * The Flink Batch execution environment. This is instantiated to either a
+   * {@link org.apache.flink.api.java.CollectionEnvironment},
+   * a {@link org.apache.flink.api.java.LocalEnvironment} or
+   * a {@link org.apache.flink.api.java.RemoteEnvironment}, depending on the 
configuration
+   * options.
+   */
+  private ExecutionEnvironment flinkBatchEnv;
 
 
-       /**
-        * The Flink Streaming execution environment. This is instantiated to 
either a
-        * {@link 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment} or
-        * a {@link 
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment}, depending
-        * on the configuration options, and more specifically, the url of the 
master.
-        */
-       private StreamExecutionEnvironment flinkStreamEnv;
+  /**
+   * The Flink Streaming execution environment. This is instantiated to either 
a
+   * {@link org.apache.flink.streaming.api.environment.LocalStreamEnvironment} 
or
+   * a {@link 
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment}, depending
+   * on the configuration options, and more specifically, the url of the 
master.
+   */
+  private StreamExecutionEnvironment flinkStreamEnv;
 
-       /**
-        * Translator for this FlinkPipelineRunner. Its role is to translate 
the Beam operators to
-        * their Flink counterparts. Based on the options provided by the user, 
if we have a streaming job,
-        * this is instantiated as a {@link FlinkStreamingPipelineTranslator}. 
In other case, i.e. a batch job,
-        * a {@link FlinkBatchPipelineTranslator} is created.
-        */
-       private FlinkPipelineTranslator flinkPipelineTranslator;
+  /**
+   * Translator for this FlinkPipelineRunner. Its role is to translate the 
Beam operators to
+   * their Flink counterparts. Based on the options provided by the user, if 
we have a streaming job,
+   * this is instantiated as a {@link FlinkStreamingPipelineTranslator}. In 
other case, i.e. a batch job,
+   * a {@link FlinkBatchPipelineTranslator} is created.
+   */
+  private FlinkPipelineTranslator flinkPipelineTranslator;
 
-       /**
-        * Creates a {@link FlinkPipelineExecutionEnvironment} with the 
user-specified parameters in the
-        * provided {@link FlinkPipelineOptions}.
-        *
-        * @param options the user-defined pipeline options.
-        * */
-       public FlinkPipelineExecutionEnvironment(FlinkPipelineOptions options) {
-               this.options = Preconditions.checkNotNull(options);
-               this.createPipelineExecutionEnvironment();
-               this.createPipelineTranslator();
-       }
+  /**
+   * Creates a {@link FlinkPipelineExecutionEnvironment} with the 
user-specified parameters in the
+   * provided {@link FlinkPipelineOptions}.
+   *
+   * @param options the user-defined pipeline options.
+   * */
+  public FlinkPipelineExecutionEnvironment(FlinkPipelineOptions options) {
+    this.options = Preconditions.checkNotNull(options);
+    this.createPipelineExecutionEnvironment();
+    this.createPipelineTranslator();
+  }
 
-       /**
-        * Depending on the type of job (Streaming or Batch) and the 
user-specified options,
-        * this method creates the adequate ExecutionEnvironment.
-        */
-       private void createPipelineExecutionEnvironment() {
-               if (options.isStreaming()) {
-                       createStreamExecutionEnvironment();
-               } else {
-                       createBatchExecutionEnvironment();
-               }
-       }
+  /**
+   * Depending on the type of job (Streaming or Batch) and the user-specified 
options,
+   * this method creates the adequate ExecutionEnvironment.
+   */
+  private void createPipelineExecutionEnvironment() {
+    if (options.isStreaming()) {
+      createStreamExecutionEnvironment();
+    } else {
+      createBatchExecutionEnvironment();
+    }
+  }
 
-       /**
-        * Depending on the type of job (Streaming or Batch), this method 
creates the adequate job graph
-        * translator. In the case of batch, it will work with {@link 
org.apache.flink.api.java.DataSet},
-        * while for streaming, it will work with {@link 
org.apache.flink.streaming.api.datastream.DataStream}.
-        */
-       private void createPipelineTranslator() {
-               checkInitializationState();
-               if (this.flinkPipelineTranslator != null) {
-                       throw new 
IllegalStateException("FlinkPipelineTranslator already initialized.");
-               }
+  /**
+   * Depending on the type of job (Streaming or Batch), this method creates 
the adequate job graph
+   * translator. In the case of batch, it will work with {@link 
org.apache.flink.api.java.DataSet},
+   * while for streaming, it will work with {@link 
org.apache.flink.streaming.api.datastream.DataStream}.
+   */
+  private void createPipelineTranslator() {
+    checkInitializationState();
+    if (this.flinkPipelineTranslator != null) {
+      throw new IllegalStateException("FlinkPipelineTranslator already 
initialized.");
+    }
 
-               this.flinkPipelineTranslator = options.isStreaming() ?
-                               new 
FlinkStreamingPipelineTranslator(flinkStreamEnv, options) :
-                               new FlinkBatchPipelineTranslator(flinkBatchEnv, 
options);
-       }
+    this.flinkPipelineTranslator = options.isStreaming() ?
+        new FlinkStreamingPipelineTranslator(flinkStreamEnv, options) :
+        new FlinkBatchPipelineTranslator(flinkBatchEnv, options);
+  }
 
-       /**
-        * Depending on if the job is a Streaming or a Batch one, this method 
creates
-        * the necessary execution environment and pipeline translator, and 
translates
-        * the {@link com.google.cloud.dataflow.sdk.values.PCollection} program 
into
-        * a {@link org.apache.flink.api.java.DataSet} or {@link 
org.apache.flink.streaming.api.datastream.DataStream}
-        * one.
-        * */
-       public void translate(Pipeline pipeline) {
-               checkInitializationState();
-               if(this.flinkBatchEnv == null && this.flinkStreamEnv == null) {
-                       createPipelineExecutionEnvironment();
-               }
-               if (this.flinkPipelineTranslator == null) {
-                       createPipelineTranslator();
-               }
-               this.flinkPipelineTranslator.translate(pipeline);
-       }
+  /**
+   * Depending on if the job is a Streaming or a Batch one, this method creates
+   * the necessary execution environment and pipeline translator, and 
translates
+   * the {@link com.google.cloud.dataflow.sdk.values.PCollection} program into
+   * a {@link org.apache.flink.api.java.DataSet} or {@link 
org.apache.flink.streaming.api.datastream.DataStream}
+   * one.
+   * */
+  public void translate(Pipeline pipeline) {
+    checkInitializationState();
+    if(this.flinkBatchEnv == null && this.flinkStreamEnv == null) {
+      createPipelineExecutionEnvironment();
+    }
+    if (this.flinkPipelineTranslator == null) {
+      createPipelineTranslator();
+    }
+    this.flinkPipelineTranslator.translate(pipeline);
+  }
 
-       /**
-        * Launches the program execution.
-        * */
-       public JobExecutionResult executePipeline() throws Exception {
-               if (options.isStreaming()) {
-                       if (this.flinkStreamEnv == null) {
-                               throw new 
RuntimeException("FlinkPipelineExecutionEnvironment not initialized.");
-                       }
-                       if (this.flinkPipelineTranslator == null) {
-                               throw new 
RuntimeException("FlinkPipelineTranslator not initialized.");
-                       }
-                       return this.flinkStreamEnv.execute();
-               } else {
-                       if (this.flinkBatchEnv == null) {
-                               throw new 
RuntimeException("FlinkPipelineExecutionEnvironment not initialized.");
-                       }
-                       if (this.flinkPipelineTranslator == null) {
-                               throw new 
RuntimeException("FlinkPipelineTranslator not initialized.");
-                       }
-                       return this.flinkBatchEnv.execute();
-               }
-       }
+  /**
+   * Launches the program execution.
+   * */
+  public JobExecutionResult executePipeline() throws Exception {
+    if (options.isStreaming()) {
+      if (this.flinkStreamEnv == null) {
+        throw new RuntimeException("FlinkPipelineExecutionEnvironment not 
initialized.");
+      }
+      if (this.flinkPipelineTranslator == null) {
+        throw new RuntimeException("FlinkPipelineTranslator not initialized.");
+      }
+      return this.flinkStreamEnv.execute();
+    } else {
+      if (this.flinkBatchEnv == null) {
+        throw new RuntimeException("FlinkPipelineExecutionEnvironment not 
initialized.");
+      }
+      if (this.flinkPipelineTranslator == null) {
+        throw new RuntimeException("FlinkPipelineTranslator not initialized.");
+      }
+      return this.flinkBatchEnv.execute();
+    }
+  }
 
-       /**
-        * If the submitted job is a batch processing job, this method creates 
the adequate
-        * Flink {@link org.apache.flink.api.java.ExecutionEnvironment} 
depending
-        * on the user-specified options.
-        */
-       private void createBatchExecutionEnvironment() {
-               if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) {
-                       throw new 
RuntimeException("FlinkPipelineExecutionEnvironment already initialized.");
-               }
+  /**
+   * If the submitted job is a batch processing job, this method creates the 
adequate
+   * Flink {@link org.apache.flink.api.java.ExecutionEnvironment} depending
+   * on the user-specified options.
+   */
+  private void createBatchExecutionEnvironment() {
+    if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) {
+      throw new RuntimeException("FlinkPipelineExecutionEnvironment already 
initialized.");
+    }
 
-               LOG.info("Creating the required Batch Execution Environment.");
+    LOG.info("Creating the required Batch Execution Environment.");
 
-               String masterUrl = options.getFlinkMaster();
-               this.flinkStreamEnv = null;
+    String masterUrl = options.getFlinkMaster();
+    this.flinkStreamEnv = null;
 
-               // depending on the master, create the right environment.
-               if (masterUrl.equals("[local]")) {
-                       this.flinkBatchEnv = 
ExecutionEnvironment.createLocalEnvironment();
-               } else if (masterUrl.equals("[collection]")) {
-                       this.flinkBatchEnv = new CollectionEnvironment();
-               } else if (masterUrl.equals("[auto]")) {
-                       this.flinkBatchEnv = 
ExecutionEnvironment.getExecutionEnvironment();
-               } else if (masterUrl.matches(".*:\\d*")) {
-                       String[] parts = masterUrl.split(":");
-                       List<String> stagingFiles = options.getFilesToStage();
-                       this.flinkBatchEnv = 
ExecutionEnvironment.createRemoteEnvironment(parts[0],
-                                       Integer.parseInt(parts[1]),
-                                       stagingFiles.toArray(new 
String[stagingFiles.size()]));
-               } else {
-                       LOG.warn("Unrecognized Flink Master URL {}. Defaulting 
to [auto].", masterUrl);
-                       this.flinkBatchEnv = 
ExecutionEnvironment.getExecutionEnvironment();
-               }
+    // depending on the master, create the right environment.
+    if (masterUrl.equals("[local]")) {
+      this.flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment();
+    } else if (masterUrl.equals("[collection]")) {
+      this.flinkBatchEnv = new CollectionEnvironment();
+    } else if (masterUrl.equals("[auto]")) {
+      this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
+    } else if (masterUrl.matches(".*:\\d*")) {
+      String[] parts = masterUrl.split(":");
+      List<String> stagingFiles = options.getFilesToStage();
+      this.flinkBatchEnv = 
ExecutionEnvironment.createRemoteEnvironment(parts[0],
+          Integer.parseInt(parts[1]),
+          stagingFiles.toArray(new String[stagingFiles.size()]));
+    } else {
+      LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", 
masterUrl);
+      this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
+    }
 
-               // set the correct parallelism.
-               if (options.getParallelism() != -1 && !(this.flinkBatchEnv 
instanceof CollectionEnvironment)) {
-                       
this.flinkBatchEnv.setParallelism(options.getParallelism());
-               }
+    // set the correct parallelism.
+    if (options.getParallelism() != -1 && !(this.flinkBatchEnv instanceof 
CollectionEnvironment)) {
+      this.flinkBatchEnv.setParallelism(options.getParallelism());
+    }
 
-               // set parallelism in the options (required by some execution 
code)
-               options.setParallelism(flinkBatchEnv.getParallelism());
-       }
+    // set parallelism in the options (required by some execution code)
+    options.setParallelism(flinkBatchEnv.getParallelism());
+  }
 
-       /**
-        * If the submitted job is a stream processing job, this method creates 
the adequate
-        * Flink {@link 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending
-        * on the user-specified options.
-        */
-       private void createStreamExecutionEnvironment() {
-               if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) {
-                       throw new 
RuntimeException("FlinkPipelineExecutionEnvironment already initialized.");
-               }
+  /**
+   * If the submitted job is a stream processing job, this method creates the 
adequate
+   * Flink {@link 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending
+   * on the user-specified options.
+   */
+  private void createStreamExecutionEnvironment() {
+    if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) {
+      throw new RuntimeException("FlinkPipelineExecutionEnvironment already 
initialized.");
+    }
 
-               LOG.info("Creating the required Streaming Environment.");
+    LOG.info("Creating the required Streaming Environment.");
 
-               String masterUrl = options.getFlinkMaster();
-               this.flinkBatchEnv = null;
+    String masterUrl = options.getFlinkMaster();
+    this.flinkBatchEnv = null;
 
-               // depending on the master, create the right environment.
-               if (masterUrl.equals("[local]")) {
-                       this.flinkStreamEnv = 
StreamExecutionEnvironment.createLocalEnvironment();
-               } else if (masterUrl.equals("[auto]")) {
-                       this.flinkStreamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               } else if (masterUrl.matches(".*:\\d*")) {
-                       String[] parts = masterUrl.split(":");
-                       List<String> stagingFiles = options.getFilesToStage();
-                       this.flinkStreamEnv = 
StreamExecutionEnvironment.createRemoteEnvironment(parts[0],
-                                       Integer.parseInt(parts[1]), 
stagingFiles.toArray(new String[stagingFiles.size()]));
-               } else {
-                       LOG.warn("Unrecognized Flink Master URL {}. Defaulting 
to [auto].", masterUrl);
-                       this.flinkStreamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               }
+    // depending on the master, create the right environment.
+    if (masterUrl.equals("[local]")) {
+      this.flinkStreamEnv = 
StreamExecutionEnvironment.createLocalEnvironment();
+    } else if (masterUrl.equals("[auto]")) {
+      this.flinkStreamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    } else if (masterUrl.matches(".*:\\d*")) {
+      String[] parts = masterUrl.split(":");
+      List<String> stagingFiles = options.getFilesToStage();
+      this.flinkStreamEnv = 
StreamExecutionEnvironment.createRemoteEnvironment(parts[0],
+          Integer.parseInt(parts[1]), stagingFiles.toArray(new 
String[stagingFiles.size()]));
+    } else {
+      LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", 
masterUrl);
+      this.flinkStreamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    }
 
-               // set the correct parallelism.
-               if (options.getParallelism() != -1) {
-                       
this.flinkStreamEnv.setParallelism(options.getParallelism());
-               }
+    // set the correct parallelism.
+    if (options.getParallelism() != -1) {
+      this.flinkStreamEnv.setParallelism(options.getParallelism());
+    }
 
-               // set parallelism in the options (required by some execution 
code)
-               options.setParallelism(flinkStreamEnv.getParallelism());
+    // set parallelism in the options (required by some execution code)
+    options.setParallelism(flinkStreamEnv.getParallelism());
 
-               // default to event time
-               
this.flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+    // default to event time
+    
this.flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 
-               // for the following 2 parameters, a value of -1 means that 
Flink will use
-               // the default values as specified in the configuration.
-               int numRetries = options.getNumberOfExecutionRetries();
-               if (numRetries != -1) {
-                       
this.flinkStreamEnv.setNumberOfExecutionRetries(numRetries);
-               }
-               long retryDelay = options.getExecutionRetryDelay();
-               if (retryDelay != -1) {
-                       
this.flinkStreamEnv.getConfig().setExecutionRetryDelay(retryDelay);
-               }
+    // for the following 2 parameters, a value of -1 means that Flink will use
+    // the default values as specified in the configuration.
+    int numRetries = options.getNumberOfExecutionRetries();
+    if (numRetries != -1) {
+      this.flinkStreamEnv.setNumberOfExecutionRetries(numRetries);
+    }
+    long retryDelay = options.getExecutionRetryDelay();
+    if (retryDelay != -1) {
+      this.flinkStreamEnv.getConfig().setExecutionRetryDelay(retryDelay);
+    }
 
-               // A value of -1 corresponds to disabled checkpointing (see 
CheckpointConfig in Flink).
-               // If the value is not -1, then the validity checks are applied.
-               // By default, checkpointing is disabled.
-               long checkpointInterval = options.getCheckpointingInterval();
-               if(checkpointInterval != -1) {
-                       if (checkpointInterval < 1) {
-                               throw new IllegalArgumentException("The 
checkpoint interval must be positive");
-                       }
-                       
this.flinkStreamEnv.enableCheckpointing(checkpointInterval);
-               }
-       }
+    // A value of -1 corresponds to disabled checkpointing (see 
CheckpointConfig in Flink).
+    // If the value is not -1, then the validity checks are applied.
+    // By default, checkpointing is disabled.
+    long checkpointInterval = options.getCheckpointingInterval();
+    if(checkpointInterval != -1) {
+      if (checkpointInterval < 1) {
+        throw new IllegalArgumentException("The checkpoint interval must be 
positive");
+      }
+      this.flinkStreamEnv.enableCheckpointing(checkpointInterval);
+    }
+  }
 
-       private void checkInitializationState() {
-               if (options.isStreaming() && this.flinkBatchEnv != null) {
-                       throw new IllegalStateException("Attempted to run a 
Streaming Job with a Batch Execution Environment.");
-               } else if (!options.isStreaming() && this.flinkStreamEnv != 
null) {
-                       throw new IllegalStateException("Attempted to run a 
Batch Job with a Streaming Execution Environment.");
-               }
-       }
+  private void checkInitializationState() {
+    if (options.isStreaming() && this.flinkBatchEnv != null) {
+      throw new IllegalStateException("Attempted to run a Streaming Job with a 
Batch Execution Environment.");
+    } else if (!options.isStreaming() && this.flinkStreamEnv != null) {
+      throw new IllegalStateException("Attempted to run a Batch Job with a 
Streaming Execution Environment.");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineOptions.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineOptions.java
index 2429cac..fabbfad 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineOptions.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineOptions.java
@@ -31,61 +31,61 @@ import java.util.List;
  */
 public interface FlinkPipelineOptions extends PipelineOptions, 
ApplicationNameOptions, StreamingOptions {
 
-       /**
-        * List of local files to make available to workers.
-        * <p>
-        * Jars are placed on the worker's classpath.
-        * <p>
-        * The default value is the list of jars from the main program's 
classpath.
-        */
-       @Description("Jar-Files to send to all workers and put on the 
classpath. " +
-                       "The default value is all files from the classpath.")
-       @JsonIgnore
-       List<String> getFilesToStage();
-       void setFilesToStage(List<String> value);
+  /**
+   * List of local files to make available to workers.
+   * <p>
+   * Jars are placed on the worker's classpath.
+   * <p>
+   * The default value is the list of jars from the main program's classpath.
+   */
+  @Description("Jar-Files to send to all workers and put on the classpath. " +
+      "The default value is all files from the classpath.")
+  @JsonIgnore
+  List<String> getFilesToStage();
+  void setFilesToStage(List<String> value);
 
-       /**
-        * The job name is used to identify jobs running on a Flink cluster.
-        */
-       @Description("Dataflow job name, to uniquely identify active jobs. "
-                       + "Defaults to using the 
ApplicationName-UserName-Date.")
-       @Default.InstanceFactory(DataflowPipelineOptions.JobNameFactory.class)
-       String getJobName();
-       void setJobName(String value);
+  /**
+   * The job name is used to identify jobs running on a Flink cluster.
+   */
+  @Description("Dataflow job name, to uniquely identify active jobs. "
+      + "Defaults to using the ApplicationName-UserName-Date.")
+  @Default.InstanceFactory(DataflowPipelineOptions.JobNameFactory.class)
+  String getJobName();
+  void setJobName(String value);
 
-       /**
-        * The url of the Flink JobManager on which to execute pipelines. This 
can either be
-        * the the address of a cluster JobManager, in the form "host:port" or 
one of the special
-        * Strings "[local]", "[collection]" or "[auto]". "[local]" will start 
a local Flink
-        * Cluster in the JVM, "[collection]" will execute the pipeline on Java 
Collections while
-        * "[auto]" will let the system decide where to execute the pipeline 
based on the environment.
-        */
-       @Description("Address of the Flink Master where the Pipeline should be 
executed. Can" +
-                       " either be of the form \"host:port\" or one of the 
special values [local], " +
-                       "[collection] or [auto].")
-       String getFlinkMaster();
-       void setFlinkMaster(String value);
+  /**
+   * The url of the Flink JobManager on which to execute pipelines. This can 
either be
+   * the the address of a cluster JobManager, in the form "host:port" or one 
of the special
+   * Strings "[local]", "[collection]" or "[auto]". "[local]" will start a 
local Flink
+   * Cluster in the JVM, "[collection]" will execute the pipeline on Java 
Collections while
+   * "[auto]" will let the system decide where to execute the pipeline based 
on the environment.
+   */
+  @Description("Address of the Flink Master where the Pipeline should be 
executed. Can" +
+      " either be of the form \"host:port\" or one of the special values 
[local], " +
+      "[collection] or [auto].")
+  String getFlinkMaster();
+  void setFlinkMaster(String value);
 
-       @Description("The degree of parallelism to be used when distributing 
operations onto workers.")
-       @Default.Integer(-1)
-       Integer getParallelism();
-       void setParallelism(Integer value);
+  @Description("The degree of parallelism to be used when distributing 
operations onto workers.")
+  @Default.Integer(-1)
+  Integer getParallelism();
+  void setParallelism(Integer value);
 
-       @Description("The interval between consecutive checkpoints (i.e. 
snapshots of the current pipeline state used for " +
-                       "fault tolerance).")
-       @Default.Long(-1L)
-       Long getCheckpointingInterval();
-       void setCheckpointingInterval(Long interval);
+  @Description("The interval between consecutive checkpoints (i.e. snapshots 
of the current pipeline state used for " +
+      "fault tolerance).")
+  @Default.Long(-1L)
+  Long getCheckpointingInterval();
+  void setCheckpointingInterval(Long interval);
 
-       @Description("Sets the number of times that failed tasks are 
re-executed. " +
-                       "A value of zero effectively disables fault tolerance. 
A value of -1 indicates " +
-                       "that the system default value (as defined in the 
configuration) should be used.")
-       @Default.Integer(-1)
-       Integer getNumberOfExecutionRetries();
-       void setNumberOfExecutionRetries(Integer retries);
+  @Description("Sets the number of times that failed tasks are re-executed. " +
+      "A value of zero effectively disables fault tolerance. A value of -1 
indicates " +
+      "that the system default value (as defined in the configuration) should 
be used.")
+  @Default.Integer(-1)
+  Integer getNumberOfExecutionRetries();
+  void setNumberOfExecutionRetries(Integer retries);
 
-       @Description("Sets the delay between executions. A value of {@code -1} 
indicates that the default value should be used.")
-       @Default.Long(-1L)
-       Long getExecutionRetryDelay();
-       void setExecutionRetryDelay(Long delay);
+  @Description("Sets the delay between executions. A value of {@code -1} 
indicates that the default value should be used.")
+  @Default.Long(-1L)
+  Long getExecutionRetryDelay();
+  void setExecutionRetryDelay(Long delay);
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
index 7ea8370..742a316 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
@@ -47,158 +47,158 @@ import java.util.Map;
  */
 public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> {
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkPipelineRunner.class);
-
-       /**
-        * Provided options.
-        */
-       private final FlinkPipelineOptions options;
-
-       private final FlinkPipelineExecutionEnvironment flinkJobEnv;
-
-       /**
-        * Construct a runner from the provided options.
-        *
-        * @param options Properties which configure the runner.
-        * @return The newly created runner.
-        */
-       public static FlinkPipelineRunner fromOptions(PipelineOptions options) {
-               FlinkPipelineOptions flinkOptions =
-                               
PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options);
-               ArrayList<String> missing = new ArrayList<>();
-
-               if (flinkOptions.getAppName() == null) {
-                       missing.add("appName");
-               }
-               if (missing.size() > 0) {
-                       throw new IllegalArgumentException(
-                                       "Missing required values: " + 
Joiner.on(',').join(missing));
-               }
-
-               if (flinkOptions.getFilesToStage() == null) {
-                       
flinkOptions.setFilesToStage(detectClassPathResourcesToStage(
-                                       
DataflowPipelineRunner.class.getClassLoader()));
-                       LOG.info("PipelineOptions.filesToStage was not 
specified. "
-                                                       + "Defaulting to files 
from the classpath: will stage {} files. "
-                                                       + "Enable logging at 
DEBUG level to see which files will be staged.",
-                                       flinkOptions.getFilesToStage().size());
-                       LOG.debug("Classpath elements: {}", 
flinkOptions.getFilesToStage());
-               }
-
-               // Verify jobName according to service requirements.
-               String jobName = flinkOptions.getJobName().toLowerCase();
-               
Preconditions.checkArgument(jobName.matches("[a-z]([-a-z0-9]*[a-z0-9])?"), 
"JobName invalid; " +
-                               "the name must consist of only the characters " 
+ "[-a-z0-9], starting with a letter " +
-                               "and ending with a letter " + "or number");
-               Preconditions.checkArgument(jobName.length() <= 40,
-                               "JobName too long; must be no more than 40 
characters in length");
-
-               // Set Flink Master to [auto] if no option was specified.
-               if (flinkOptions.getFlinkMaster() == null) {
-                       flinkOptions.setFlinkMaster("[auto]");
-               }
-
-               return new FlinkPipelineRunner(flinkOptions);
-       }
-
-       private FlinkPipelineRunner(FlinkPipelineOptions options) {
-               this.options = options;
-               this.flinkJobEnv = new 
FlinkPipelineExecutionEnvironment(options);
-       }
-
-       @Override
-       public FlinkRunnerResult run(Pipeline pipeline) {
-               LOG.info("Executing pipeline using FlinkPipelineRunner.");
-
-               LOG.info("Translating pipeline to Flink program.");
-
-               this.flinkJobEnv.translate(pipeline);
-
-               LOG.info("Starting execution of Flink program.");
-               
-               JobExecutionResult result;
-               try {
-                       result = this.flinkJobEnv.executePipeline();
-               } catch (Exception e) {
-                       LOG.error("Pipeline execution failed", e);
-                       throw new RuntimeException("Pipeline execution failed", 
e);
-               }
-
-               LOG.info("Execution finished in {} msecs", 
result.getNetRuntime());
-
-               Map<String, Object> accumulators = 
result.getAllAccumulatorResults();
-               if (accumulators != null && !accumulators.isEmpty()) {
-                       LOG.info("Final aggregator values:");
-
-                       for (Map.Entry<String, Object> entry : 
result.getAllAccumulatorResults().entrySet()) {
-                               LOG.info("{} : {}", entry.getKey(), 
entry.getValue());
-                       }
-               }
-
-               return new FlinkRunnerResult(accumulators, 
result.getNetRuntime());
-       }
-
-       /**
-        * For testing.
-        */
-       public FlinkPipelineOptions getPipelineOptions() {
-               return options;
-       }
-
-       /**
-        * Constructs a runner with default properties for testing.
-        *
-        * @return The newly created runner.
-        */
-       public static FlinkPipelineRunner createForTest(boolean streaming) {
-               FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
-               // we use [auto] for testing since this will make it pick up 
the Testing
-               // ExecutionEnvironment
-               options.setFlinkMaster("[auto]");
-               options.setStreaming(streaming);
-               return new FlinkPipelineRunner(options);
-       }
-
-       @Override
-       public <Output extends POutput, Input extends PInput> Output apply(
-                       PTransform<Input, Output> transform, Input input) {
-               return super.apply(transform, input);
-       }
-
-       
/////////////////////////////////////////////////////////////////////////////
-
-       @Override
-       public String toString() {
-               return "DataflowPipelineRunner#" + hashCode();
-       }
-
-       /**
-        * Attempts to detect all the resources the class loader has access to. 
This does not recurse
-        * to class loader parents stopping it from pulling in resources from 
the system class loader.
-        *
-        * @param classLoader The URLClassLoader to use to detect resources to 
stage.
-        * @return A list of absolute paths to the resources the class loader 
uses.
-        * @throws IllegalArgumentException If either the class loader is not a 
URLClassLoader or one
-        *                                  of the resources the class loader 
exposes is not a file resource.
-        */
-       protected static List<String> 
detectClassPathResourcesToStage(ClassLoader classLoader) {
-               if (!(classLoader instanceof URLClassLoader)) {
-                       String message = String.format("Unable to use 
ClassLoader to detect classpath elements. "
-                                       + "Current ClassLoader is %s, only 
URLClassLoaders are supported.", classLoader);
-                       LOG.error(message);
-                       throw new IllegalArgumentException(message);
-               }
-
-               List<String> files = new ArrayList<>();
-               for (URL url : ((URLClassLoader) classLoader).getURLs()) {
-                       try {
-                               files.add(new 
File(url.toURI()).getAbsolutePath());
-                       } catch (IllegalArgumentException | URISyntaxException 
e) {
-                               String message = String.format("Unable to 
convert url (%s) to file.", url);
-                               LOG.error(message);
-                               throw new IllegalArgumentException(message, e);
-                       }
-               }
-               return files;
-       }
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkPipelineRunner.class);
+
+  /**
+   * Provided options.
+   */
+  private final FlinkPipelineOptions options;
+
+  private final FlinkPipelineExecutionEnvironment flinkJobEnv;
+
+  /**
+   * Construct a runner from the provided options.
+   *
+   * @param options Properties which configure the runner.
+   * @return The newly created runner.
+   */
+  public static FlinkPipelineRunner fromOptions(PipelineOptions options) {
+    FlinkPipelineOptions flinkOptions =
+        PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options);
+    ArrayList<String> missing = new ArrayList<>();
+
+    if (flinkOptions.getAppName() == null) {
+      missing.add("appName");
+    }
+    if (missing.size() > 0) {
+      throw new IllegalArgumentException(
+          "Missing required values: " + Joiner.on(',').join(missing));
+    }
+
+    if (flinkOptions.getFilesToStage() == null) {
+      flinkOptions.setFilesToStage(detectClassPathResourcesToStage(
+          DataflowPipelineRunner.class.getClassLoader()));
+      LOG.info("PipelineOptions.filesToStage was not specified. "
+              + "Defaulting to files from the classpath: will stage {} files. "
+              + "Enable logging at DEBUG level to see which files will be 
staged.",
+          flinkOptions.getFilesToStage().size());
+      LOG.debug("Classpath elements: {}", flinkOptions.getFilesToStage());
+    }
+
+    // Verify jobName according to service requirements.
+    String jobName = flinkOptions.getJobName().toLowerCase();
+    Preconditions.checkArgument(jobName.matches("[a-z]([-a-z0-9]*[a-z0-9])?"), 
"JobName invalid; " +
+        "the name must consist of only the characters " + "[-a-z0-9], starting 
with a letter " +
+        "and ending with a letter " + "or number");
+    Preconditions.checkArgument(jobName.length() <= 40,
+        "JobName too long; must be no more than 40 characters in length");
+
+    // Set Flink Master to [auto] if no option was specified.
+    if (flinkOptions.getFlinkMaster() == null) {
+      flinkOptions.setFlinkMaster("[auto]");
+    }
+
+    return new FlinkPipelineRunner(flinkOptions);
+  }
+
+  private FlinkPipelineRunner(FlinkPipelineOptions options) {
+    this.options = options;
+    this.flinkJobEnv = new FlinkPipelineExecutionEnvironment(options);
+  }
+
+  @Override
+  public FlinkRunnerResult run(Pipeline pipeline) {
+    LOG.info("Executing pipeline using FlinkPipelineRunner.");
+
+    LOG.info("Translating pipeline to Flink program.");
+
+    this.flinkJobEnv.translate(pipeline);
+
+    LOG.info("Starting execution of Flink program.");
+    
+    JobExecutionResult result;
+    try {
+      result = this.flinkJobEnv.executePipeline();
+    } catch (Exception e) {
+      LOG.error("Pipeline execution failed", e);
+      throw new RuntimeException("Pipeline execution failed", e);
+    }
+
+    LOG.info("Execution finished in {} msecs", result.getNetRuntime());
+
+    Map<String, Object> accumulators = result.getAllAccumulatorResults();
+    if (accumulators != null && !accumulators.isEmpty()) {
+      LOG.info("Final aggregator values:");
+
+      for (Map.Entry<String, Object> entry : 
result.getAllAccumulatorResults().entrySet()) {
+        LOG.info("{} : {}", entry.getKey(), entry.getValue());
+      }
+    }
+
+    return new FlinkRunnerResult(accumulators, result.getNetRuntime());
+  }
+
+  /**
+   * For testing.
+   */
+  public FlinkPipelineOptions getPipelineOptions() {
+    return options;
+  }
+
+  /**
+   * Constructs a runner with default properties for testing.
+   *
+   * @return The newly created runner.
+   */
+  public static FlinkPipelineRunner createForTest(boolean streaming) {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    // we use [auto] for testing since this will make it pick up the Testing
+    // ExecutionEnvironment
+    options.setFlinkMaster("[auto]");
+    options.setStreaming(streaming);
+    return new FlinkPipelineRunner(options);
+  }
+
+  @Override
+  public <Output extends POutput, Input extends PInput> Output apply(
+      PTransform<Input, Output> transform, Input input) {
+    return super.apply(transform, input);
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  @Override
+  public String toString() {
+    return "DataflowPipelineRunner#" + hashCode();
+  }
+
+  /**
+   * Attempts to detect all the resources the class loader has access to. This 
does not recurse
+   * to class loader parents stopping it from pulling in resources from the 
system class loader.
+   *
+   * @param classLoader The URLClassLoader to use to detect resources to stage.
+   * @return A list of absolute paths to the resources the class loader uses.
+   * @throws IllegalArgumentException If either the class loader is not a 
URLClassLoader or one
+   *                                  of the resources the class loader 
exposes is not a file resource.
+   */
+  protected static List<String> detectClassPathResourcesToStage(ClassLoader 
classLoader) {
+    if (!(classLoader instanceof URLClassLoader)) {
+      String message = String.format("Unable to use ClassLoader to detect 
classpath elements. "
+          + "Current ClassLoader is %s, only URLClassLoaders are supported.", 
classLoader);
+      LOG.error(message);
+      throw new IllegalArgumentException(message);
+    }
+
+    List<String> files = new ArrayList<>();
+    for (URL url : ((URLClassLoader) classLoader).getURLs()) {
+      try {
+        files.add(new File(url.toURI()).getAbsolutePath());
+      } catch (IllegalArgumentException | URISyntaxException e) {
+        String message = String.format("Unable to convert url (%s) to file.", 
url);
+        LOG.error(message);
+        throw new IllegalArgumentException(message, e);
+      }
+    }
+    return files;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkRunnerResult.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkRunnerResult.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkRunnerResult.java
index 59b8b63..dfbaf66 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkRunnerResult.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkRunnerResult.java
@@ -29,38 +29,38 @@ import java.util.Map;
  * {@link com.google.cloud.dataflow.sdk.transforms.Aggregator}s.
  */
 public class FlinkRunnerResult implements PipelineResult {
-       
-       private final Map<String, Object> aggregators;
-       
-       private final long runtime;
-       
-       public FlinkRunnerResult(Map<String, Object> aggregators, long runtime) 
{
-               this.aggregators = (aggregators == null || 
aggregators.isEmpty()) ?
-                               Collections.<String, Object>emptyMap() :
-                               Collections.unmodifiableMap(aggregators);
-               
-               this.runtime = runtime;
-       }
+  
+  private final Map<String, Object> aggregators;
+  
+  private final long runtime;
+  
+  public FlinkRunnerResult(Map<String, Object> aggregators, long runtime) {
+    this.aggregators = (aggregators == null || aggregators.isEmpty()) ?
+        Collections.<String, Object>emptyMap() :
+        Collections.unmodifiableMap(aggregators);
+    
+    this.runtime = runtime;
+  }
 
-       @Override
-       public State getState() {
-               return null;
-       }
+  @Override
+  public State getState() {
+    return null;
+  }
 
-       @Override
-       public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, 
T> aggregator) throws AggregatorRetrievalException {
-               // TODO provide a list of all accumulator step values
-               Object value = aggregators.get(aggregator.getName());
-               if (value != null) {
-                       return new AggregatorValues<T>() {
-                               @Override
-                               public Map<String, T> getValuesAtSteps() {
-                                       return (Map<String, T>) aggregators;
-                               }
-                       };
-               } else {
-                       throw new AggregatorRetrievalException("Accumulator 
results not found.",
-                                       new RuntimeException("Accumulator does 
not exist."));
-               }
-       }
+  @Override
+  public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> 
aggregator) throws AggregatorRetrievalException {
+    // TODO provide a list of all accumulator step values
+    Object value = aggregators.get(aggregator.getName());
+    if (value != null) {
+      return new AggregatorValues<T>() {
+        @Override
+        public Map<String, T> getValuesAtSteps() {
+          return (Map<String, T>) aggregators;
+        }
+      };
+    } else {
+      throw new AggregatorRetrievalException("Accumulator results not found.",
+          new RuntimeException("Accumulator does not exist."));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/TFIDF.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/TFIDF.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/TFIDF.java
index 86a2695..8accae7 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/TFIDF.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/TFIDF.java
@@ -89,364 +89,364 @@ import java.util.Set;
  * {@code --input}.
  */
 public class TFIDF {
-       /**
-        * Options supported by {@link TFIDF}.
-        * <p>
-        * Inherits standard configuration options.
-        */
-       private interface Options extends PipelineOptions, FlinkPipelineOptions 
{
-               @Description("Path to the directory or GCS prefix containing 
files to read from")
-               @Default.String("gs://dataflow-samples/shakespeare/")
-               String getInput();
-               void setInput(String value);
-
-               @Description("Prefix of output URI to write to")
-               @Validation.Required
-               String getOutput();
-               void setOutput(String value);
-       }
-
-       /**
-        * Lists documents contained beneath the {@code options.input} 
prefix/directory.
-        */
-       public static Set<URI> listInputDocuments(Options options)
-                       throws URISyntaxException, IOException {
-               URI baseUri = new URI(options.getInput());
-
-               // List all documents in the directory or GCS prefix.
-               URI absoluteUri;
-               if (baseUri.getScheme() != null) {
-                       absoluteUri = baseUri;
-               } else {
-                       absoluteUri = new URI(
-                                       "file",
-                                       baseUri.getAuthority(),
-                                       baseUri.getPath(),
-                                       baseUri.getQuery(),
-                                       baseUri.getFragment());
-               }
-
-               Set<URI> uris = new HashSet<>();
-               if (absoluteUri.getScheme().equals("file")) {
-                       File directory = new File(absoluteUri);
-                       for (String entry : directory.list()) {
-                               File path = new File(directory, entry);
-                               uris.add(path.toURI());
-                       }
-               } else if (absoluteUri.getScheme().equals("gs")) {
-                       GcsUtil gcsUtil = 
options.as(GcsOptions.class).getGcsUtil();
-                       URI gcsUriGlob = new URI(
-                                       absoluteUri.getScheme(),
-                                       absoluteUri.getAuthority(),
-                                       absoluteUri.getPath() + "*",
-                                       absoluteUri.getQuery(),
-                                       absoluteUri.getFragment());
-                       for (GcsPath entry : 
gcsUtil.expand(GcsPath.fromUri(gcsUriGlob))) {
-                               uris.add(entry.toUri());
-                       }
-               }
-
-               return uris;
-       }
-
-       /**
-        * Reads the documents at the provided uris and returns all lines
-        * from the documents tagged with which document they are from.
-        */
-       public static class ReadDocuments
-                       extends PTransform<PInput, PCollection<KV<URI, 
String>>> {
-               private static final long serialVersionUID = 0;
-
-               private Iterable<URI> uris;
-
-               public ReadDocuments(Iterable<URI> uris) {
-                       this.uris = uris;
-               }
-
-               @Override
-               public Coder<?> getDefaultOutputCoder() {
-                       return KvCoder.of(StringDelegateCoder.of(URI.class), 
StringUtf8Coder.of());
-               }
-
-               @Override
-               public PCollection<KV<URI, String>> apply(PInput input) {
-                       Pipeline pipeline = input.getPipeline();
-
-                       // Create one TextIO.Read transform for each document
-                       // and add its output to a PCollectionList
-                       PCollectionList<KV<URI, String>> urisToLines =
-                                       PCollectionList.empty(pipeline);
-
-                       // TextIO.Read supports:
-                       //  - file: URIs and paths locally
-                       //  - gs: URIs on the service
-                       for (final URI uri : uris) {
-                               String uriString;
-                               if (uri.getScheme().equals("file")) {
-                                       uriString = new File(uri).getPath();
-                               } else {
-                                       uriString = uri.toString();
-                               }
-
-                               PCollection<KV<URI, String>> oneUriToLines = 
pipeline
-                                               
.apply(TextIO.Read.from(uriString)
-                                                               
.named("TextIO.Read(" + uriString + ")"))
-                                               .apply("WithKeys(" + uriString 
+ ")", WithKeys.<URI, String>of(uri));
-
-                               urisToLines = urisToLines.and(oneUriToLines);
-                       }
-
-                       return urisToLines.apply(Flatten.<KV<URI, 
String>>pCollections());
-               }
-       }
-
-       /**
-        * A transform containing a basic TF-IDF pipeline. The input consists 
of KV objects
-        * where the key is the document's URI and the value is a piece
-        * of the document's content. The output is mapping from terms to
-        * scores for each document URI.
-        */
-       public static class ComputeTfIdf
-                       extends PTransform<PCollection<KV<URI, String>>, 
PCollection<KV<String, KV<URI, Double>>>> {
-               private static final long serialVersionUID = 0;
-
-               public ComputeTfIdf() { }
-
-               @Override
-               public PCollection<KV<String, KV<URI, Double>>> apply(
-                               PCollection<KV<URI, String>> uriToContent) {
-
-                       // Compute the total number of documents, and
-                       // prepare this singleton PCollectionView for
-                       // use as a side input.
-                       final PCollectionView<Long> totalDocuments =
-                                       uriToContent
-                                                       .apply("GetURIs", 
Keys.<URI>create())
-                                                       
.apply("RemoveDuplicateDocs", RemoveDuplicates.<URI>create())
-                                                       
.apply(Count.<URI>globally())
-                                                       
.apply(View.<Long>asSingleton());
-
-                       // Create a collection of pairs mapping a URI to each
-                       // of the words in the document associated with that 
that URI.
-                       PCollection<KV<URI, String>> uriToWords = uriToContent
-                                       .apply(ParDo.named("SplitWords").of(
-                                                       new DoFn<KV<URI, 
String>, KV<URI, String>>() {
-                                                               private static 
final long serialVersionUID = 0;
-
-                                                               @Override
-                                                               public void 
processElement(ProcessContext c) {
-                                                                       URI uri 
= c.element().getKey();
-                                                                       String 
line = c.element().getValue();
-                                                                       for 
(String word : line.split("\\W+")) {
-                                                                               
// Log INFO messages when the word “love” is found.
-                                                                               
if (word.toLowerCase().equals("love")) {
-                                                                               
        LOG.info("Found {}", word.toLowerCase());
-                                                                               
}
-
-                                                                               
if (!word.isEmpty()) {
-                                                                               
        c.output(KV.of(uri, word.toLowerCase()));
-                                                                               
}
-                                                                       }
-                                                               }
-                                                       }));
-
-                       // Compute a mapping from each word to the total
-                       // number of documents in which it appears.
-                       PCollection<KV<String, Long>> wordToDocCount = 
uriToWords
-                                       .apply("RemoveDuplicateWords", 
RemoveDuplicates.<KV<URI, String>>create())
-                                       .apply(Values.<String>create())
-                                       .apply("CountDocs", 
Count.<String>perElement());
-
-                       // Compute a mapping from each URI to the total
-                       // number of words in the document associated with that 
URI.
-                       PCollection<KV<URI, Long>> uriToWordTotal = uriToWords
-                                       .apply("GetURIs2", Keys.<URI>create())
-                                       .apply("CountWords", 
Count.<URI>perElement());
-
-                       // Count, for each (URI, word) pair, the number of
-                       // occurrences of that word in the document associated
-                       // with the URI.
-                       PCollection<KV<KV<URI, String>, Long>> 
uriAndWordToCount = uriToWords
-                                       .apply("CountWordDocPairs", 
Count.<KV<URI, String>>perElement());
-
-                       // Adjust the above collection to a mapping from
-                       // (URI, word) pairs to counts into an isomorphic 
mapping
-                       // from URI to (word, count) pairs, to prepare for a 
join
-                       // by the URI key.
-                       PCollection<KV<URI, KV<String, Long>>> 
uriToWordAndCount = uriAndWordToCount
-                                       .apply(ParDo.named("ShiftKeys").of(
-                                                       new DoFn<KV<KV<URI, 
String>, Long>, KV<URI, KV<String, Long>>>() {
-                                                               private static 
final long serialVersionUID = 0;
-
-                                                               @Override
-                                                               public void 
processElement(ProcessContext c) {
-                                                                       URI uri 
= c.element().getKey().getKey();
-                                                                       String 
word = c.element().getKey().getValue();
-                                                                       Long 
occurrences = c.element().getValue();
-                                                                       
c.output(KV.of(uri, KV.of(word, occurrences)));
-                                                               }
-                                                       }));
-
-                       // Prepare to join the mapping of URI to (word, count) 
pairs with
-                       // the mapping of URI to total word counts, by 
associating
-                       // each of the input PCollection<KV<URI, ...>> with
-                       // a tuple tag. Each input must have the same key type, 
URI
-                       // in this case. The type parameter of the tuple tag 
matches
-                       // the types of the values for each collection.
-                       final TupleTag<Long> wordTotalsTag = new TupleTag<>();
-                       final TupleTag<KV<String, Long>> wordCountsTag = new 
TupleTag<>();
-                       KeyedPCollectionTuple<URI> coGbkInput = 
KeyedPCollectionTuple
-                                       .of(wordTotalsTag, uriToWordTotal)
-                                       .and(wordCountsTag, uriToWordAndCount);
-
-                       // Perform a CoGroupByKey (a sort of pre-join) on the 
prepared
-                       // inputs. This yields a mapping from URI to a 
CoGbkResult
-                       // (CoGroupByKey Result). The CoGbkResult is a mapping
-                       // from the above tuple tags to the values in each input
-                       // associated with a particular URI. In this case, each
-                       // KV<URI, CoGbkResult> group a URI with the total 
number of
-                       // words in that document as well as all the (word, 
count)
-                       // pairs for particular words.
-                       PCollection<KV<URI, CoGbkResult>> 
uriToWordAndCountAndTotal = coGbkInput
-                                       .apply("CoGroupByUri", 
CoGroupByKey.<URI>create());
-
-                       // Compute a mapping from each word to a (URI, term 
frequency)
-                       // pair for each URI. A word's term frequency for a 
document
-                       // is simply the number of times that word occurs in 
the document
-                       // divided by the total number of words in the document.
-                       PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf 
= uriToWordAndCountAndTotal
-                                       
.apply(ParDo.named("ComputeTermFrequencies").of(
-                                                       new DoFn<KV<URI, 
CoGbkResult>, KV<String, KV<URI, Double>>>() {
-                                                               private static 
final long serialVersionUID = 0;
-
-                                                               @Override
-                                                               public void 
processElement(ProcessContext c) {
-                                                                       URI uri 
= c.element().getKey();
-                                                                       Long 
wordTotal = c.element().getValue().getOnly(wordTotalsTag);
-
-                                                                       for 
(KV<String, Long> wordAndCount
-                                                                               
        : c.element().getValue().getAll(wordCountsTag)) {
-                                                                               
String word = wordAndCount.getKey();
-                                                                               
Long wordCount = wordAndCount.getValue();
-                                                                               
Double termFrequency = wordCount.doubleValue() / wordTotal.doubleValue();
-                                                                               
c.output(KV.of(word, KV.of(uri, termFrequency)));
-                                                                       }
-                                                               }
-                                                       }));
-
-                       // Compute a mapping from each word to its document 
frequency.
-                       // A word's document frequency in a corpus is the 
number of
-                       // documents in which the word appears divided by the 
total
-                       // number of documents in the corpus. Note how the 
total number of
-                       // documents is passed as a side input; the same value 
is
-                       // presented to each invocation of the DoFn.
-                       PCollection<KV<String, Double>> wordToDf = 
wordToDocCount
-                                       .apply(ParDo
-                                                       
.named("ComputeDocFrequencies")
-                                                       
.withSideInputs(totalDocuments)
-                                                       .of(new DoFn<KV<String, 
Long>, KV<String, Double>>() {
-                                                               private static 
final long serialVersionUID = 0;
-
-                                                               @Override
-                                                               public void 
processElement(ProcessContext c) {
-                                                                       String 
word = c.element().getKey();
-                                                                       Long 
documentCount = c.element().getValue();
-                                                                       Long 
documentTotal = c.sideInput(totalDocuments);
-                                                                       Double 
documentFrequency = documentCount.doubleValue()
-                                                                               
        / documentTotal.doubleValue();
-
-                                                                       
c.output(KV.of(word, documentFrequency));
-                                                               }
-                                                       }));
-
-                       // Join the term frequency and document frequency
-                       // collections, each keyed on the word.
-                       final TupleTag<KV<URI, Double>> tfTag = new 
TupleTag<>();
-                       final TupleTag<Double> dfTag = new TupleTag<>();
-                       PCollection<KV<String, CoGbkResult>> 
wordToUriAndTfAndDf = KeyedPCollectionTuple
-                                       .of(tfTag, wordToUriAndTf)
-                                       .and(dfTag, wordToDf)
-                                       .apply(CoGroupByKey.<String>create());
-
-                       // Compute a mapping from each word to a (URI, TF-IDF) 
score
-                       // for each URI. There are a variety of definitions of 
TF-IDF
-                       // ("term frequency - inverse document frequency") 
score;
-                       // here we use a basic version that is the term 
frequency
-                       // divided by the log of the document frequency.
-
-                       return wordToUriAndTfAndDf
-                                       .apply(ParDo.named("ComputeTfIdf").of(
-                                                       new DoFn<KV<String, 
CoGbkResult>, KV<String, KV<URI, Double>>>() {
-                                                               private static 
final long serialVersionUID1 = 0;
-
-                                                               @Override
-                                                               public void 
processElement(ProcessContext c) {
-                                                                       String 
word = c.element().getKey();
-                                                                       Double 
df = c.element().getValue().getOnly(dfTag);
-
-                                                                       for 
(KV<URI, Double> uriAndTf : c.element().getValue().getAll(tfTag)) {
-                                                                               
URI uri = uriAndTf.getKey();
-                                                                               
Double tf = uriAndTf.getValue();
-                                                                               
Double tfIdf = tf * Math.log(1 / df);
-                                                                               
c.output(KV.of(word, KV.of(uri, tfIdf)));
-                                                                       }
-                                                               }
-                                                       }));
-               }
-
-               // Instantiate Logger.
-               // It is suggested that the user specify the class name of the 
containing class
-               // (in this case ComputeTfIdf).
-               private static final Logger LOG = 
LoggerFactory.getLogger(ComputeTfIdf.class);
-       }
-
-       /**
-        * A {@link PTransform} to write, in CSV format, a mapping from term 
and URI
-        * to score.
-        */
-       public static class WriteTfIdf
-                       extends PTransform<PCollection<KV<String, KV<URI, 
Double>>>, PDone> {
-               private static final long serialVersionUID = 0;
-
-               private String output;
-
-               public WriteTfIdf(String output) {
-                       this.output = output;
-               }
-
-               @Override
-               public PDone apply(PCollection<KV<String, KV<URI, Double>>> 
wordToUriAndTfIdf) {
-                       return wordToUriAndTfIdf
-                                       .apply(ParDo.named("Format").of(new 
DoFn<KV<String, KV<URI, Double>>, String>() {
-                                               private static final long 
serialVersionUID = 0;
-
-                                               @Override
-                                               public void 
processElement(ProcessContext c) {
-                                                       
c.output(String.format("%s,\t%s,\t%f",
-                                                                       
c.element().getKey(),
-                                                                       
c.element().getValue().getKey(),
-                                                                       
c.element().getValue().getValue()));
-                                               }
-                                       }))
-                                       .apply(TextIO.Write
-                                                       .to(output)
-                                                       .withSuffix(".csv"));
-               }
-       }
-
-       public static void main(String[] args) throws Exception {
-               Options options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-
-               options.setRunner(FlinkPipelineRunner.class);
-
-               Pipeline pipeline = Pipeline.create(options);
-               pipeline.getCoderRegistry().registerCoder(URI.class, 
StringDelegateCoder.of(URI.class));
-
-               pipeline
-                               .apply(new 
ReadDocuments(listInputDocuments(options)))
-                               .apply(new ComputeTfIdf())
-                               .apply(new WriteTfIdf(options.getOutput()));
-
-               pipeline.run();
-       }
+  /**
+   * Options supported by {@link TFIDF}.
+   * <p>
+   * Inherits standard configuration options.
+   */
+  private interface Options extends PipelineOptions, FlinkPipelineOptions {
+    @Description("Path to the directory or GCS prefix containing files to read 
from")
+    @Default.String("gs://dataflow-samples/shakespeare/")
+    String getInput();
+    void setInput(String value);
+
+    @Description("Prefix of output URI to write to")
+    @Validation.Required
+    String getOutput();
+    void setOutput(String value);
+  }
+
+  /**
+   * Lists documents contained beneath the {@code options.input} 
prefix/directory.
+   */
+  public static Set<URI> listInputDocuments(Options options)
+      throws URISyntaxException, IOException {
+    URI baseUri = new URI(options.getInput());
+
+    // List all documents in the directory or GCS prefix.
+    URI absoluteUri;
+    if (baseUri.getScheme() != null) {
+      absoluteUri = baseUri;
+    } else {
+      absoluteUri = new URI(
+          "file",
+          baseUri.getAuthority(),
+          baseUri.getPath(),
+          baseUri.getQuery(),
+          baseUri.getFragment());
+    }
+
+    Set<URI> uris = new HashSet<>();
+    if (absoluteUri.getScheme().equals("file")) {
+      File directory = new File(absoluteUri);
+      for (String entry : directory.list()) {
+        File path = new File(directory, entry);
+        uris.add(path.toURI());
+      }
+    } else if (absoluteUri.getScheme().equals("gs")) {
+      GcsUtil gcsUtil = options.as(GcsOptions.class).getGcsUtil();
+      URI gcsUriGlob = new URI(
+          absoluteUri.getScheme(),
+          absoluteUri.getAuthority(),
+          absoluteUri.getPath() + "*",
+          absoluteUri.getQuery(),
+          absoluteUri.getFragment());
+      for (GcsPath entry : gcsUtil.expand(GcsPath.fromUri(gcsUriGlob))) {
+        uris.add(entry.toUri());
+      }
+    }
+
+    return uris;
+  }
+
+  /**
+   * Reads the documents at the provided uris and returns all lines
+   * from the documents tagged with which document they are from.
+   */
+  public static class ReadDocuments
+      extends PTransform<PInput, PCollection<KV<URI, String>>> {
+    private static final long serialVersionUID = 0;
+
+    private Iterable<URI> uris;
+
+    public ReadDocuments(Iterable<URI> uris) {
+      this.uris = uris;
+    }
+
+    @Override
+    public Coder<?> getDefaultOutputCoder() {
+      return KvCoder.of(StringDelegateCoder.of(URI.class), 
StringUtf8Coder.of());
+    }
+
+    @Override
+    public PCollection<KV<URI, String>> apply(PInput input) {
+      Pipeline pipeline = input.getPipeline();
+
+      // Create one TextIO.Read transform for each document
+      // and add its output to a PCollectionList
+      PCollectionList<KV<URI, String>> urisToLines =
+          PCollectionList.empty(pipeline);
+
+      // TextIO.Read supports:
+      //  - file: URIs and paths locally
+      //  - gs: URIs on the service
+      for (final URI uri : uris) {
+        String uriString;
+        if (uri.getScheme().equals("file")) {
+          uriString = new File(uri).getPath();
+        } else {
+          uriString = uri.toString();
+        }
+
+        PCollection<KV<URI, String>> oneUriToLines = pipeline
+            .apply(TextIO.Read.from(uriString)
+                .named("TextIO.Read(" + uriString + ")"))
+            .apply("WithKeys(" + uriString + ")", WithKeys.<URI, 
String>of(uri));
+
+        urisToLines = urisToLines.and(oneUriToLines);
+      }
+
+      return urisToLines.apply(Flatten.<KV<URI, String>>pCollections());
+    }
+  }
+
+  /**
+   * A transform containing a basic TF-IDF pipeline. The input consists of KV 
objects
+   * where the key is the document's URI and the value is a piece
+   * of the document's content. The output is mapping from terms to
+   * scores for each document URI.
+   */
+  public static class ComputeTfIdf
+      extends PTransform<PCollection<KV<URI, String>>, PCollection<KV<String, 
KV<URI, Double>>>> {
+    private static final long serialVersionUID = 0;
+
+    public ComputeTfIdf() { }
+
+    @Override
+    public PCollection<KV<String, KV<URI, Double>>> apply(
+        PCollection<KV<URI, String>> uriToContent) {
+
+      // Compute the total number of documents, and
+      // prepare this singleton PCollectionView for
+      // use as a side input.
+      final PCollectionView<Long> totalDocuments =
+          uriToContent
+              .apply("GetURIs", Keys.<URI>create())
+              .apply("RemoveDuplicateDocs", RemoveDuplicates.<URI>create())
+              .apply(Count.<URI>globally())
+              .apply(View.<Long>asSingleton());
+
+      // Create a collection of pairs mapping a URI to each
+      // of the words in the document associated with that that URI.
+      PCollection<KV<URI, String>> uriToWords = uriToContent
+          .apply(ParDo.named("SplitWords").of(
+              new DoFn<KV<URI, String>, KV<URI, String>>() {
+                private static final long serialVersionUID = 0;
+
+                @Override
+                public void processElement(ProcessContext c) {
+                  URI uri = c.element().getKey();
+                  String line = c.element().getValue();
+                  for (String word : line.split("\\W+")) {
+                    // Log INFO messages when the word “love” is found.
+                    if (word.toLowerCase().equals("love")) {
+                      LOG.info("Found {}", word.toLowerCase());
+                    }
+
+                    if (!word.isEmpty()) {
+                      c.output(KV.of(uri, word.toLowerCase()));
+                    }
+                  }
+                }
+              }));
+
+      // Compute a mapping from each word to the total
+      // number of documents in which it appears.
+      PCollection<KV<String, Long>> wordToDocCount = uriToWords
+          .apply("RemoveDuplicateWords", RemoveDuplicates.<KV<URI, 
String>>create())
+          .apply(Values.<String>create())
+          .apply("CountDocs", Count.<String>perElement());
+
+      // Compute a mapping from each URI to the total
+      // number of words in the document associated with that URI.
+      PCollection<KV<URI, Long>> uriToWordTotal = uriToWords
+          .apply("GetURIs2", Keys.<URI>create())
+          .apply("CountWords", Count.<URI>perElement());
+
+      // Count, for each (URI, word) pair, the number of
+      // occurrences of that word in the document associated
+      // with the URI.
+      PCollection<KV<KV<URI, String>, Long>> uriAndWordToCount = uriToWords
+          .apply("CountWordDocPairs", Count.<KV<URI, String>>perElement());
+
+      // Adjust the above collection to a mapping from
+      // (URI, word) pairs to counts into an isomorphic mapping
+      // from URI to (word, count) pairs, to prepare for a join
+      // by the URI key.
+      PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = 
uriAndWordToCount
+          .apply(ParDo.named("ShiftKeys").of(
+              new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() 
{
+                private static final long serialVersionUID = 0;
+
+                @Override
+                public void processElement(ProcessContext c) {
+                  URI uri = c.element().getKey().getKey();
+                  String word = c.element().getKey().getValue();
+                  Long occurrences = c.element().getValue();
+                  c.output(KV.of(uri, KV.of(word, occurrences)));
+                }
+              }));
+
+      // Prepare to join the mapping of URI to (word, count) pairs with
+      // the mapping of URI to total word counts, by associating
+      // each of the input PCollection<KV<URI, ...>> with
+      // a tuple tag. Each input must have the same key type, URI
+      // in this case. The type parameter of the tuple tag matches
+      // the types of the values for each collection.
+      final TupleTag<Long> wordTotalsTag = new TupleTag<>();
+      final TupleTag<KV<String, Long>> wordCountsTag = new TupleTag<>();
+      KeyedPCollectionTuple<URI> coGbkInput = KeyedPCollectionTuple
+          .of(wordTotalsTag, uriToWordTotal)
+          .and(wordCountsTag, uriToWordAndCount);
+
+      // Perform a CoGroupByKey (a sort of pre-join) on the prepared
+      // inputs. This yields a mapping from URI to a CoGbkResult
+      // (CoGroupByKey Result). The CoGbkResult is a mapping
+      // from the above tuple tags to the values in each input
+      // associated with a particular URI. In this case, each
+      // KV<URI, CoGbkResult> group a URI with the total number of
+      // words in that document as well as all the (word, count)
+      // pairs for particular words.
+      PCollection<KV<URI, CoGbkResult>> uriToWordAndCountAndTotal = coGbkInput
+          .apply("CoGroupByUri", CoGroupByKey.<URI>create());
+
+      // Compute a mapping from each word to a (URI, term frequency)
+      // pair for each URI. A word's term frequency for a document
+      // is simply the number of times that word occurs in the document
+      // divided by the total number of words in the document.
+      PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = 
uriToWordAndCountAndTotal
+          .apply(ParDo.named("ComputeTermFrequencies").of(
+              new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+                private static final long serialVersionUID = 0;
+
+                @Override
+                public void processElement(ProcessContext c) {
+                  URI uri = c.element().getKey();
+                  Long wordTotal = 
c.element().getValue().getOnly(wordTotalsTag);
+
+                  for (KV<String, Long> wordAndCount
+                      : c.element().getValue().getAll(wordCountsTag)) {
+                    String word = wordAndCount.getKey();
+                    Long wordCount = wordAndCount.getValue();
+                    Double termFrequency = wordCount.doubleValue() / 
wordTotal.doubleValue();
+                    c.output(KV.of(word, KV.of(uri, termFrequency)));
+                  }
+                }
+              }));
+
+      // Compute a mapping from each word to its document frequency.
+      // A word's document frequency in a corpus is the number of
+      // documents in which the word appears divided by the total
+      // number of documents in the corpus. Note how the total number of
+      // documents is passed as a side input; the same value is
+      // presented to each invocation of the DoFn.
+      PCollection<KV<String, Double>> wordToDf = wordToDocCount
+          .apply(ParDo
+              .named("ComputeDocFrequencies")
+              .withSideInputs(totalDocuments)
+              .of(new DoFn<KV<String, Long>, KV<String, Double>>() {
+                private static final long serialVersionUID = 0;
+
+                @Override
+                public void processElement(ProcessContext c) {
+                  String word = c.element().getKey();
+                  Long documentCount = c.element().getValue();
+                  Long documentTotal = c.sideInput(totalDocuments);
+                  Double documentFrequency = documentCount.doubleValue()
+                      / documentTotal.doubleValue();
+
+                  c.output(KV.of(word, documentFrequency));
+                }
+              }));
+
+      // Join the term frequency and document frequency
+      // collections, each keyed on the word.
+      final TupleTag<KV<URI, Double>> tfTag = new TupleTag<>();
+      final TupleTag<Double> dfTag = new TupleTag<>();
+      PCollection<KV<String, CoGbkResult>> wordToUriAndTfAndDf = 
KeyedPCollectionTuple
+          .of(tfTag, wordToUriAndTf)
+          .and(dfTag, wordToDf)
+          .apply(CoGroupByKey.<String>create());
+
+      // Compute a mapping from each word to a (URI, TF-IDF) score
+      // for each URI. There are a variety of definitions of TF-IDF
+      // ("term frequency - inverse document frequency") score;
+      // here we use a basic version that is the term frequency
+      // divided by the log of the document frequency.
+
+      return wordToUriAndTfAndDf
+          .apply(ParDo.named("ComputeTfIdf").of(
+              new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() 
{
+                private static final long serialVersionUID1 = 0;
+
+                @Override
+                public void processElement(ProcessContext c) {
+                  String word = c.element().getKey();
+                  Double df = c.element().getValue().getOnly(dfTag);
+
+                  for (KV<URI, Double> uriAndTf : 
c.element().getValue().getAll(tfTag)) {
+                    URI uri = uriAndTf.getKey();
+                    Double tf = uriAndTf.getValue();
+                    Double tfIdf = tf * Math.log(1 / df);
+                    c.output(KV.of(word, KV.of(uri, tfIdf)));
+                  }
+                }
+              }));
+    }
+
+    // Instantiate Logger.
+    // It is suggested that the user specify the class name of the containing 
class
+    // (in this case ComputeTfIdf).
+    private static final Logger LOG = 
LoggerFactory.getLogger(ComputeTfIdf.class);
+  }
+
+  /**
+   * A {@link PTransform} to write, in CSV format, a mapping from term and URI
+   * to score.
+   */
+  public static class WriteTfIdf
+      extends PTransform<PCollection<KV<String, KV<URI, Double>>>, PDone> {
+    private static final long serialVersionUID = 0;
+
+    private String output;
+
+    public WriteTfIdf(String output) {
+      this.output = output;
+    }
+
+    @Override
+    public PDone apply(PCollection<KV<String, KV<URI, Double>>> 
wordToUriAndTfIdf) {
+      return wordToUriAndTfIdf
+          .apply(ParDo.named("Format").of(new DoFn<KV<String, KV<URI, 
Double>>, String>() {
+            private static final long serialVersionUID = 0;
+
+            @Override
+            public void processElement(ProcessContext c) {
+              c.output(String.format("%s,\t%s,\t%f",
+                  c.element().getKey(),
+                  c.element().getValue().getKey(),
+                  c.element().getValue().getValue()));
+            }
+          }))
+          .apply(TextIO.Write
+              .to(output)
+              .withSuffix(".csv"));
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    Options options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+
+    options.setRunner(FlinkPipelineRunner.class);
+
+    Pipeline pipeline = Pipeline.create(options);
+    pipeline.getCoderRegistry().registerCoder(URI.class, 
StringDelegateCoder.of(URI.class));
+
+    pipeline
+        .apply(new ReadDocuments(listInputDocuments(options)))
+        .apply(new ComputeTfIdf())
+        .apply(new WriteTfIdf(options.getOutput()));
+
+    pipeline.run();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/WordCount.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/WordCount.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/WordCount.java
index e737fe8..4f721b4 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/WordCount.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/WordCount.java
@@ -29,83 +29,83 @@ import com.google.cloud.dataflow.sdk.values.PCollection;
 
 public class WordCount {
 
-       public static class ExtractWordsFn extends DoFn<String, String> {
-               private final Aggregator<Long, Long> emptyLines =
-                               createAggregator("emptyLines", new 
Sum.SumLongFn());
-
-               @Override
-               public void processElement(ProcessContext c) {
-                       if (c.element().trim().isEmpty()) {
-                               emptyLines.addValue(1L);
-                       }
-
-                       // Split the line into words.
-                       String[] words = c.element().split("[^a-zA-Z']+");
-
-                       // Output each word encountered into the output 
PCollection.
-                       for (String word : words) {
-                               if (!word.isEmpty()) {
-                                       c.output(word);
-                               }
-                       }
-               }
-       }
-
-       public static class CountWords extends PTransform<PCollection<String>,
+  public static class ExtractWordsFn extends DoFn<String, String> {
+    private final Aggregator<Long, Long> emptyLines =
+        createAggregator("emptyLines", new Sum.SumLongFn());
+
+    @Override
+    public void processElement(ProcessContext c) {
+      if (c.element().trim().isEmpty()) {
+        emptyLines.addValue(1L);
+      }
+
+      // Split the line into words.
+      String[] words = c.element().split("[^a-zA-Z']+");
+
+      // Output each word encountered into the output PCollection.
+      for (String word : words) {
+        if (!word.isEmpty()) {
+          c.output(word);
+        }
+      }
+    }
+  }
+
+  public static class CountWords extends PTransform<PCollection<String>,
                     PCollection<KV<String, Long>>> {
-               @Override
-               public PCollection<KV<String, Long>> apply(PCollection<String> 
lines) {
-
-                       // Convert lines of text into individual words.
-                       PCollection<String> words = lines.apply(
-                                       ParDo.of(new ExtractWordsFn()));
-
-                       // Count the number of times each word occurs.
-                       PCollection<KV<String, Long>> wordCounts =
-                                       words.apply(Count.<String>perElement());
-
-                       return wordCounts;
-               }
-       }
-
-       /** A SimpleFunction that converts a Word and Count into a printable 
string. */
-       public static class FormatAsTextFn extends SimpleFunction<KV<String, 
Long>, String> {
-               @Override
-               public String apply(KV<String, Long> input) {
-                       return input.getKey() + ": " + input.getValue();
-               }
-       }
-
-       /**
-        * Options supported by {@link WordCount}.
-        * <p>
-        * Inherits standard configuration options.
-        */
-       public interface Options extends PipelineOptions, FlinkPipelineOptions {
-               @Description("Path of the file to read from")
-               
@Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
-               String getInput();
-               void setInput(String value);
-
-               @Description("Path of the file to write to")
-               String getOutput();
-               void setOutput(String value);
-       }
-
-       public static void main(String[] args) {
-
-               Options options = 
PipelineOptionsFactory.fromArgs(args).withValidation()
-                               .as(Options.class);
-               options.setRunner(FlinkPipelineRunner.class);
-
-               Pipeline p = Pipeline.create(options);
-
-               p.apply(TextIO.Read.named("ReadLines").from(options.getInput()))
-                               .apply(new CountWords())
-                               .apply(MapElements.via(new FormatAsTextFn()))
-                               
.apply(TextIO.Write.named("WriteCounts").to(options.getOutput()));
-
-               p.run();
-       }
+    @Override
+    public PCollection<KV<String, Long>> apply(PCollection<String> lines) {
+
+      // Convert lines of text into individual words.
+      PCollection<String> words = lines.apply(
+          ParDo.of(new ExtractWordsFn()));
+
+      // Count the number of times each word occurs.
+      PCollection<KV<String, Long>> wordCounts =
+          words.apply(Count.<String>perElement());
+
+      return wordCounts;
+    }
+  }
+
+  /** A SimpleFunction that converts a Word and Count into a printable string. 
*/
+  public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, 
String> {
+    @Override
+    public String apply(KV<String, Long> input) {
+      return input.getKey() + ": " + input.getValue();
+    }
+  }
+
+  /**
+   * Options supported by {@link WordCount}.
+   * <p>
+   * Inherits standard configuration options.
+   */
+  public interface Options extends PipelineOptions, FlinkPipelineOptions {
+    @Description("Path of the file to read from")
+    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
+    String getInput();
+    void setInput(String value);
+
+    @Description("Path of the file to write to")
+    String getOutput();
+    void setOutput(String value);
+  }
+
+  public static void main(String[] args) {
+
+    Options options = PipelineOptionsFactory.fromArgs(args).withValidation()
+        .as(Options.class);
+    options.setRunner(FlinkPipelineRunner.class);
+
+    Pipeline p = Pipeline.create(options);
+
+    p.apply(TextIO.Read.named("ReadLines").from(options.getInput()))
+        .apply(new CountWords())
+        .apply(MapElements.via(new FormatAsTextFn()))
+        .apply(TextIO.Write.named("WriteCounts").to(options.getOutput()));
+
+    p.run();
+  }
 
 }

Reply via email to