http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java
index 60f6788..a6e1e37 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java
@@ -50,109 +50,109 @@ import org.joda.time.Duration;
  * */
 public class JoinExamples {
 
-       static PCollection<String> joinEvents(PCollection<String> streamA,
-                                                                               
  PCollection<String> streamB) throws Exception {
-
-               final TupleTag<String> firstInfoTag = new TupleTag<>();
-               final TupleTag<String> secondInfoTag = new TupleTag<>();
-
-               // transform both input collections to tuple collections, where 
the keys are country
-               // codes in both cases.
-               PCollection<KV<String, String>> firstInfo = streamA.apply(
-                               ParDo.of(new ExtractEventDataFn()));
-               PCollection<KV<String, String>> secondInfo = streamB.apply(
-                               ParDo.of(new ExtractEventDataFn()));
-
-               // country code 'key' -> CGBKR (<event info>, <country name>)
-               PCollection<KV<String, CoGbkResult>> kvpCollection = 
KeyedPCollectionTuple
-                               .of(firstInfoTag, firstInfo)
-                               .and(secondInfoTag, secondInfo)
-                               .apply(CoGroupByKey.<String>create());
-
-               // Process the CoGbkResult elements generated by the 
CoGroupByKey transform.
-               // country code 'key' -> string of <event info>, <country name>
-               PCollection<KV<String, String>> finalResultCollection =
-                               kvpCollection.apply(ParDo.named("Process").of(
-                                               new DoFn<KV<String, 
CoGbkResult>, KV<String, String>>() {
-                                                       private static final 
long serialVersionUID = 0;
-
-                                                       @Override
-                                                       public void 
processElement(ProcessContext c) {
-                                                               KV<String, 
CoGbkResult> e = c.element();
-                                                               String key = 
e.getKey();
-
-                                                               String defaultA 
= "NO_VALUE";
-
-                                                               // the 
following getOnly is a bit tricky because it expects to have
-                                                               // EXACTLY ONE 
value in the corresponding stream and for the corresponding key.
-
-                                                               String lineA = 
e.getValue().getOnly(firstInfoTag, defaultA);
-                                                               for (String 
lineB : c.element().getValue().getAll(secondInfoTag)) {
-                                                                       // 
Generate a string that combines information from both collection values
-                                                                       
c.output(KV.of(key, "Value A: " + lineA + " - Value B: " + lineB));
-                                                               }
-                                                       }
-                                               }));
-
-               return finalResultCollection
-                               .apply(ParDo.named("Format").of(new 
DoFn<KV<String, String>, String>() {
-                                       private static final long 
serialVersionUID = 0;
-
-                                       @Override
-                                       public void 
processElement(ProcessContext c) {
-                                               String result = 
c.element().getKey() + " -> " + c.element().getValue();
-                                               System.out.println(result);
-                                               c.output(result);
-                                       }
-                               }));
-       }
-
-       static class ExtractEventDataFn extends DoFn<String, KV<String, 
String>> {
-               private static final long serialVersionUID = 0;
-
-               @Override
-               public void processElement(ProcessContext c) {
-                       String line = c.element().toLowerCase();
-                       String key = line.split("\\s")[0];
-                       c.output(KV.of(key, line));
-               }
-       }
-
-       private interface Options extends 
WindowedWordCount.StreamingWordCountOptions {
-
-       }
-
-       public static void main(String[] args) throws Exception {
-               Options options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-               options.setStreaming(true);
-               options.setCheckpointingInterval(1000L);
-               options.setNumberOfExecutionRetries(5);
-               options.setExecutionRetryDelay(3000L);
-               options.setRunner(FlinkPipelineRunner.class);
-
-               PTransform<? super PBegin, PCollection<String>> readSourceA =
-                               Read.from(new 
UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("FirstStream");
-               PTransform<? super PBegin, PCollection<String>> readSourceB =
-                               Read.from(new 
UnboundedSocketSource<>("localhost", 9998, '\n', 3)).named("SecondStream");
-
-               WindowFn<Object, ?> windowFn = 
FixedWindows.of(Duration.standardSeconds(options.getWindowSize()));
-
-               Pipeline p = Pipeline.create(options);
-
-               // the following two 'applys' create multiple inputs to our 
pipeline, one for each
-               // of our two input sources.
-               PCollection<String> streamA = p.apply(readSourceA)
-                               .apply(Window.<String>into(windowFn)
-                                               
.triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
-                                               .discardingFiredPanes());
-               PCollection<String> streamB = p.apply(readSourceB)
-                               .apply(Window.<String>into(windowFn)
-                                               
.triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
-                                               .discardingFiredPanes());
-
-               PCollection<String> formattedResults = joinEvents(streamA, 
streamB);
-               formattedResults.apply(TextIO.Write.to("./outputJoin.txt"));
-               p.run();
-       }
+  static PCollection<String> joinEvents(PCollection<String> streamA,
+                      PCollection<String> streamB) throws Exception {
+
+    final TupleTag<String> firstInfoTag = new TupleTag<>();
+    final TupleTag<String> secondInfoTag = new TupleTag<>();
+
+    // transform both input collections to tuple collections, where the keys 
are country
+    // codes in both cases.
+    PCollection<KV<String, String>> firstInfo = streamA.apply(
+        ParDo.of(new ExtractEventDataFn()));
+    PCollection<KV<String, String>> secondInfo = streamB.apply(
+        ParDo.of(new ExtractEventDataFn()));
+
+    // country code 'key' -> CGBKR (<event info>, <country name>)
+    PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple
+        .of(firstInfoTag, firstInfo)
+        .and(secondInfoTag, secondInfo)
+        .apply(CoGroupByKey.<String>create());
+
+    // Process the CoGbkResult elements generated by the CoGroupByKey 
transform.
+    // country code 'key' -> string of <event info>, <country name>
+    PCollection<KV<String, String>> finalResultCollection =
+        kvpCollection.apply(ParDo.named("Process").of(
+            new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
+              private static final long serialVersionUID = 0;
+
+              @Override
+              public void processElement(ProcessContext c) {
+                KV<String, CoGbkResult> e = c.element();
+                String key = e.getKey();
+
+                String defaultA = "NO_VALUE";
+
+                // the following getOnly is a bit tricky because it expects to 
have
+                // EXACTLY ONE value in the corresponding stream and for the 
corresponding key.
+
+                String lineA = e.getValue().getOnly(firstInfoTag, defaultA);
+                for (String lineB : 
c.element().getValue().getAll(secondInfoTag)) {
+                  // Generate a string that combines information from both 
collection values
+                  c.output(KV.of(key, "Value A: " + lineA + " - Value B: " + 
lineB));
+                }
+              }
+            }));
+
+    return finalResultCollection
+        .apply(ParDo.named("Format").of(new DoFn<KV<String, String>, String>() 
{
+          private static final long serialVersionUID = 0;
+
+          @Override
+          public void processElement(ProcessContext c) {
+            String result = c.element().getKey() + " -> " + 
c.element().getValue();
+            System.out.println(result);
+            c.output(result);
+          }
+        }));
+  }
+
+  static class ExtractEventDataFn extends DoFn<String, KV<String, String>> {
+    private static final long serialVersionUID = 0;
+
+    @Override
+    public void processElement(ProcessContext c) {
+      String line = c.element().toLowerCase();
+      String key = line.split("\\s")[0];
+      c.output(KV.of(key, line));
+    }
+  }
+
+  private interface Options extends 
WindowedWordCount.StreamingWordCountOptions {
+
+  }
+
+  public static void main(String[] args) throws Exception {
+    Options options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+    options.setStreaming(true);
+    options.setCheckpointingInterval(1000L);
+    options.setNumberOfExecutionRetries(5);
+    options.setExecutionRetryDelay(3000L);
+    options.setRunner(FlinkPipelineRunner.class);
+
+    PTransform<? super PBegin, PCollection<String>> readSourceA =
+        Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 
3)).named("FirstStream");
+    PTransform<? super PBegin, PCollection<String>> readSourceB =
+        Read.from(new UnboundedSocketSource<>("localhost", 9998, '\n', 
3)).named("SecondStream");
+
+    WindowFn<Object, ?> windowFn = 
FixedWindows.of(Duration.standardSeconds(options.getWindowSize()));
+
+    Pipeline p = Pipeline.create(options);
+
+    // the following two 'applys' create multiple inputs to our pipeline, one 
for each
+    // of our two input sources.
+    PCollection<String> streamA = p.apply(readSourceA)
+        .apply(Window.<String>into(windowFn)
+            
.triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
+            .discardingFiredPanes());
+    PCollection<String> streamB = p.apply(readSourceB)
+        .apply(Window.<String>into(windowFn)
+            
.triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
+            .discardingFiredPanes());
+
+    PCollection<String> formattedResults = joinEvents(streamA, streamB);
+    formattedResults.apply(TextIO.Write.to("./outputJoin.txt"));
+    p.run();
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java
index dba2721..b97c35c 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java
@@ -36,106 +36,106 @@ import java.util.Properties;
 
 public class KafkaWindowedWordCountExample {
 
-       static final String KAFKA_TOPIC = "test";  // Default kafka topic to 
read from
-       static final String KAFKA_BROKER = "localhost:9092";  // Default kafka 
broker to contact
-       static final String GROUP_ID = "myGroup";  // Default groupId
-       static final String ZOOKEEPER = "localhost:2181";  // Default zookeeper 
to connect to for Kafka
-
-       public static class ExtractWordsFn extends DoFn<String, String> {
-               private final Aggregator<Long, Long> emptyLines =
-                               createAggregator("emptyLines", new 
Sum.SumLongFn());
-
-               @Override
-               public void processElement(ProcessContext c) {
-                       if (c.element().trim().isEmpty()) {
-                               emptyLines.addValue(1L);
-                       }
-
-                       // Split the line into words.
-                       String[] words = c.element().split("[^a-zA-Z']+");
-
-                       // Output each word encountered into the output 
PCollection.
-                       for (String word : words) {
-                               if (!word.isEmpty()) {
-                                       c.output(word);
-                               }
-                       }
-               }
-       }
-
-       public static class FormatAsStringFn extends DoFn<KV<String, Long>, 
String> {
-               @Override
-               public void processElement(ProcessContext c) {
-                       String row = c.element().getKey() + " - " + 
c.element().getValue() + " @ " + c.timestamp().toString();
-                       System.out.println(row);
-                       c.output(row);
-               }
-       }
-
-       public interface KafkaStreamingWordCountOptions extends 
WindowedWordCount.StreamingWordCountOptions {
-               @Description("The Kafka topic to read from")
-               @Default.String(KAFKA_TOPIC)
-               String getKafkaTopic();
-
-               void setKafkaTopic(String value);
-
-               @Description("The Kafka Broker to read from")
-               @Default.String(KAFKA_BROKER)
-               String getBroker();
-
-               void setBroker(String value);
-
-               @Description("The Zookeeper server to connect to")
-               @Default.String(ZOOKEEPER)
-               String getZookeeper();
-
-               void setZookeeper(String value);
-
-               @Description("The groupId")
-               @Default.String(GROUP_ID)
-               String getGroup();
-
-               void setGroup(String value);
-
-       }
-
-       public static void main(String[] args) {
-               
PipelineOptionsFactory.register(KafkaStreamingWordCountOptions.class);
-               KafkaStreamingWordCountOptions options = 
PipelineOptionsFactory.fromArgs(args).as(KafkaStreamingWordCountOptions.class);
-               options.setJobName("KafkaExample");
-               options.setStreaming(true);
-               options.setCheckpointingInterval(1000L);
-               options.setNumberOfExecutionRetries(5);
-               options.setExecutionRetryDelay(3000L);
-               options.setRunner(FlinkPipelineRunner.class);
-
-               System.out.println(options.getKafkaTopic() +" "+ 
options.getZookeeper() +" "+ options.getBroker() +" "+ options.getGroup() );
-               Pipeline pipeline = Pipeline.create(options);
-
-               Properties p = new Properties();
-               p.setProperty("zookeeper.connect", options.getZookeeper());
-               p.setProperty("bootstrap.servers", options.getBroker());
-               p.setProperty("group.id", options.getGroup());
-
-               // this is the Flink consumer that reads the input to
-               // the program from a kafka topic.
-               FlinkKafkaConsumer082 kafkaConsumer = new 
FlinkKafkaConsumer082<>(
-                               options.getKafkaTopic(),
-                               new SimpleStringSchema(), p);
-
-               PCollection<String> words = pipeline
-                               .apply(Read.from(new 
UnboundedFlinkSource<String, UnboundedSource.CheckpointMark>(options, 
kafkaConsumer)).named("StreamingWordCount"))
-                               .apply(ParDo.of(new ExtractWordsFn()))
-                               
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowSize())))
-                                               
.triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
-                                               .discardingFiredPanes());
-
-               PCollection<KV<String, Long>> wordCounts =
-                               words.apply(Count.<String>perElement());
-
-               wordCounts.apply(ParDo.of(new FormatAsStringFn()))
-                               .apply(TextIO.Write.to("./outputKafka.txt"));
-
-               pipeline.run();
-       }
+  static final String KAFKA_TOPIC = "test";  // Default kafka topic to read 
from
+  static final String KAFKA_BROKER = "localhost:9092";  // Default kafka 
broker to contact
+  static final String GROUP_ID = "myGroup";  // Default groupId
+  static final String ZOOKEEPER = "localhost:2181";  // Default zookeeper to 
connect to for Kafka
+
+  public static class ExtractWordsFn extends DoFn<String, String> {
+    private final Aggregator<Long, Long> emptyLines =
+        createAggregator("emptyLines", new Sum.SumLongFn());
+
+    @Override
+    public void processElement(ProcessContext c) {
+      if (c.element().trim().isEmpty()) {
+        emptyLines.addValue(1L);
+      }
+
+      // Split the line into words.
+      String[] words = c.element().split("[^a-zA-Z']+");
+
+      // Output each word encountered into the output PCollection.
+      for (String word : words) {
+        if (!word.isEmpty()) {
+          c.output(word);
+        }
+      }
+    }
+  }
+
+  public static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
+    @Override
+    public void processElement(ProcessContext c) {
+      String row = c.element().getKey() + " - " + c.element().getValue() + " @ 
" + c.timestamp().toString();
+      System.out.println(row);
+      c.output(row);
+    }
+  }
+
+  public interface KafkaStreamingWordCountOptions extends 
WindowedWordCount.StreamingWordCountOptions {
+    @Description("The Kafka topic to read from")
+    @Default.String(KAFKA_TOPIC)
+    String getKafkaTopic();
+
+    void setKafkaTopic(String value);
+
+    @Description("The Kafka Broker to read from")
+    @Default.String(KAFKA_BROKER)
+    String getBroker();
+
+    void setBroker(String value);
+
+    @Description("The Zookeeper server to connect to")
+    @Default.String(ZOOKEEPER)
+    String getZookeeper();
+
+    void setZookeeper(String value);
+
+    @Description("The groupId")
+    @Default.String(GROUP_ID)
+    String getGroup();
+
+    void setGroup(String value);
+
+  }
+
+  public static void main(String[] args) {
+    PipelineOptionsFactory.register(KafkaStreamingWordCountOptions.class);
+    KafkaStreamingWordCountOptions options = 
PipelineOptionsFactory.fromArgs(args).as(KafkaStreamingWordCountOptions.class);
+    options.setJobName("KafkaExample");
+    options.setStreaming(true);
+    options.setCheckpointingInterval(1000L);
+    options.setNumberOfExecutionRetries(5);
+    options.setExecutionRetryDelay(3000L);
+    options.setRunner(FlinkPipelineRunner.class);
+
+    System.out.println(options.getKafkaTopic() +" "+ options.getZookeeper() +" 
"+ options.getBroker() +" "+ options.getGroup() );
+    Pipeline pipeline = Pipeline.create(options);
+
+    Properties p = new Properties();
+    p.setProperty("zookeeper.connect", options.getZookeeper());
+    p.setProperty("bootstrap.servers", options.getBroker());
+    p.setProperty("group.id", options.getGroup());
+
+    // this is the Flink consumer that reads the input to
+    // the program from a kafka topic.
+    FlinkKafkaConsumer082 kafkaConsumer = new FlinkKafkaConsumer082<>(
+        options.getKafkaTopic(),
+        new SimpleStringSchema(), p);
+
+    PCollection<String> words = pipeline
+        .apply(Read.from(new UnboundedFlinkSource<String, 
UnboundedSource.CheckpointMark>(options, 
kafkaConsumer)).named("StreamingWordCount"))
+        .apply(ParDo.of(new ExtractWordsFn()))
+        
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowSize())))
+            
.triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
+            .discardingFiredPanes());
+
+    PCollection<KV<String, Long>> wordCounts =
+        words.apply(Count.<String>perElement());
+
+    wordCounts.apply(ParDo.of(new FormatAsStringFn()))
+        .apply(TextIO.Write.to("./outputKafka.txt"));
+
+    pipeline.run();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java
index 37dc39a..753cbc3 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java
@@ -45,84 +45,84 @@ import java.io.IOException;
  * */
 public class WindowedWordCount {
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(WindowedWordCount.class);
-
-       static final long WINDOW_SIZE = 10;  // Default window duration in 
seconds
-       static final long SLIDE_SIZE = 5;  // Default window slide in seconds
-
-       static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
-               @Override
-               public void processElement(ProcessContext c) {
-                       String row = c.element().getKey() + " - " + 
c.element().getValue() + " @ " + c.timestamp().toString();
-                       c.output(row);
-               }
-       }
-
-       static class ExtractWordsFn extends DoFn<String, String> {
-               private final Aggregator<Long, Long> emptyLines =
-                               createAggregator("emptyLines", new 
Sum.SumLongFn());
-
-               @Override
-               public void processElement(ProcessContext c) {
-                       if (c.element().trim().isEmpty()) {
-                               emptyLines.addValue(1L);
-                       }
-
-                       // Split the line into words.
-                       String[] words = c.element().split("[^a-zA-Z']+");
-
-                       // Output each word encountered into the output 
PCollection.
-                       for (String word : words) {
-                               if (!word.isEmpty()) {
-                                       c.output(word);
-                               }
-                       }
-               }
-       }
-
-       public interface StreamingWordCountOptions extends 
com.dataartisans.flink.dataflow.examples.WordCount.Options {
-               @Description("Sliding window duration, in seconds")
-               @Default.Long(WINDOW_SIZE)
-               Long getWindowSize();
-
-               void setWindowSize(Long value);
-
-               @Description("Window slide, in seconds")
-               @Default.Long(SLIDE_SIZE)
-               Long getSlide();
-
-               void setSlide(Long value);
-       }
-
-       public static void main(String[] args) throws IOException {
-               StreamingWordCountOptions options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(StreamingWordCountOptions.class);
-               options.setStreaming(true);
-               options.setWindowSize(10L);
-               options.setSlide(5L);
-               options.setCheckpointingInterval(1000L);
-               options.setNumberOfExecutionRetries(5);
-               options.setExecutionRetryDelay(3000L);
-               options.setRunner(FlinkPipelineRunner.class);
-
-               LOG.info("Windpwed WordCount with Sliding Windows of " + 
options.getWindowSize() +
-                               " sec. and a slide of " + options.getSlide());
-
-               Pipeline pipeline = Pipeline.create(options);
-
-               PCollection<String> words = pipeline
-                               .apply(Read.from(new 
UnboundedSocketSource<>("localhost", 9999, '\n', 
3)).named("StreamingWordCount"))
-                               .apply(ParDo.of(new ExtractWordsFn()))
-                               
.apply(Window.<String>into(SlidingWindows.of(Duration.standardSeconds(options.getWindowSize()))
-                                               
.every(Duration.standardSeconds(options.getSlide())))
-                                               
.triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
-                                               .discardingFiredPanes());
-
-               PCollection<KV<String, Long>> wordCounts =
-                               words.apply(Count.<String>perElement());
-
-               wordCounts.apply(ParDo.of(new FormatAsStringFn()))
-                               
.apply(TextIO.Write.to("./outputWordCount.txt"));
-
-               pipeline.run();
-       }
+  private static final Logger LOG = 
LoggerFactory.getLogger(WindowedWordCount.class);
+
+  static final long WINDOW_SIZE = 10;  // Default window duration in seconds
+  static final long SLIDE_SIZE = 5;  // Default window slide in seconds
+
+  static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
+    @Override
+    public void processElement(ProcessContext c) {
+      String row = c.element().getKey() + " - " + c.element().getValue() + " @ 
" + c.timestamp().toString();
+      c.output(row);
+    }
+  }
+
+  static class ExtractWordsFn extends DoFn<String, String> {
+    private final Aggregator<Long, Long> emptyLines =
+        createAggregator("emptyLines", new Sum.SumLongFn());
+
+    @Override
+    public void processElement(ProcessContext c) {
+      if (c.element().trim().isEmpty()) {
+        emptyLines.addValue(1L);
+      }
+
+      // Split the line into words.
+      String[] words = c.element().split("[^a-zA-Z']+");
+
+      // Output each word encountered into the output PCollection.
+      for (String word : words) {
+        if (!word.isEmpty()) {
+          c.output(word);
+        }
+      }
+    }
+  }
+
+  public interface StreamingWordCountOptions extends 
com.dataartisans.flink.dataflow.examples.WordCount.Options {
+    @Description("Sliding window duration, in seconds")
+    @Default.Long(WINDOW_SIZE)
+    Long getWindowSize();
+
+    void setWindowSize(Long value);
+
+    @Description("Window slide, in seconds")
+    @Default.Long(SLIDE_SIZE)
+    Long getSlide();
+
+    void setSlide(Long value);
+  }
+
+  public static void main(String[] args) throws IOException {
+    StreamingWordCountOptions options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(StreamingWordCountOptions.class);
+    options.setStreaming(true);
+    options.setWindowSize(10L);
+    options.setSlide(5L);
+    options.setCheckpointingInterval(1000L);
+    options.setNumberOfExecutionRetries(5);
+    options.setExecutionRetryDelay(3000L);
+    options.setRunner(FlinkPipelineRunner.class);
+
+    LOG.info("Windpwed WordCount with Sliding Windows of " + 
options.getWindowSize() +
+        " sec. and a slide of " + options.getSlide());
+
+    Pipeline pipeline = Pipeline.create(options);
+
+    PCollection<String> words = pipeline
+        .apply(Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 
3)).named("StreamingWordCount"))
+        .apply(ParDo.of(new ExtractWordsFn()))
+        
.apply(Window.<String>into(SlidingWindows.of(Duration.standardSeconds(options.getWindowSize()))
+            .every(Duration.standardSeconds(options.getSlide())))
+            
.triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
+            .discardingFiredPanes());
+
+    PCollection<KV<String, Long>> wordCounts =
+        words.apply(Count.<String>perElement());
+
+    wordCounts.apply(ParDo.of(new FormatAsStringFn()))
+        .apply(TextIO.Write.to("./outputWordCount.txt"));
+
+    pipeline.run();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/io/ConsoleIO.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/io/ConsoleIO.java 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/io/ConsoleIO.java
index 90fb635..3f3492c 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/io/ConsoleIO.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/io/ConsoleIO.java
@@ -28,53 +28,53 @@ import com.google.cloud.dataflow.sdk.values.PDone;
  */
 public class ConsoleIO {
 
-       /**
-        * A PTransform that writes a PCollection to a standard output.
-        */
-       public static class Write {
+  /**
+   * A PTransform that writes a PCollection to a standard output.
+   */
+  public static class Write {
 
-               /**
-                * Returns a ConsoleIO.Write PTransform with a default step 
name.
-                */
-               public static Bound create() {
-                       return new Bound();
-               }
+    /**
+     * Returns a ConsoleIO.Write PTransform with a default step name.
+     */
+    public static Bound create() {
+      return new Bound();
+    }
 
-               /**
-                * Returns a ConsoleIO.Write PTransform with the given step 
name.
-                */
-               public static Bound named(String name) {
-                       return new Bound().named(name);
-               }
+    /**
+     * Returns a ConsoleIO.Write PTransform with the given step name.
+     */
+    public static Bound named(String name) {
+      return new Bound().named(name);
+    }
 
-               /**
-                * A PTransform that writes a bounded PCollection to standard 
output.
-                */
-               public static class Bound extends PTransform<PCollection<?>, 
PDone> {
-                       private static final long serialVersionUID = 0;
+    /**
+     * A PTransform that writes a bounded PCollection to standard output.
+     */
+    public static class Bound extends PTransform<PCollection<?>, PDone> {
+      private static final long serialVersionUID = 0;
 
-                       Bound() {
-                               super("ConsoleIO.Write");
-                       }
+      Bound() {
+        super("ConsoleIO.Write");
+      }
 
-                       Bound(String name) {
-                               super(name);
-                       }
+      Bound(String name) {
+        super(name);
+      }
 
-                       /**
-                        * Returns a new ConsoleIO.Write PTransform that's like 
this one but with the given
-                        * step
-                        * name.  Does not modify this object.
-                        */
-                       public Bound named(String name) {
-                               return new Bound(name);
-                       }
+      /**
+       * Returns a new ConsoleIO.Write PTransform that's like this one but 
with the given
+       * step
+       * name.  Does not modify this object.
+       */
+      public Bound named(String name) {
+        return new Bound(name);
+      }
 
-                       @Override
-                       public PDone apply(PCollection<?> input) {
-                               return PDone.in(input.getPipeline());
-                       }
-               }
-       }
+      @Override
+      public PDone apply(PCollection<?> input) {
+        return PDone.in(input.getPipeline());
+      }
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchPipelineTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchPipelineTranslator.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchPipelineTranslator.java
index a1e4410..82b7e97 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchPipelineTranslator.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchPipelineTranslator.java
@@ -31,119 +31,119 @@ import org.apache.flink.api.java.ExecutionEnvironment;
  */
 public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
 
-       /**
-        * The necessary context in the case of a batch job.
-        */
-       private final FlinkBatchTranslationContext batchContext;
-
-       private int depth = 0;
-
-       /**
-        * Composite transform that we want to translate before proceeding with 
other transforms.
-        */
-       private PTransform<?, ?> currentCompositeTransform;
-
-       public FlinkBatchPipelineTranslator(ExecutionEnvironment env, 
PipelineOptions options) {
-               this.batchContext = new FlinkBatchTranslationContext(env, 
options);
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Pipeline Visitor Methods
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public void enterCompositeTransform(TransformTreeNode node) {
-               System.out.println(genSpaces(this.depth) + 
"enterCompositeTransform- " + formatNodeName(node));
-
-               PTransform<?, ?> transform = node.getTransform();
-               if (transform != null && currentCompositeTransform == null) {
-
-                       BatchTransformTranslator<?> translator = 
FlinkBatchTransformTranslators.getTranslator(transform);
-                       if (translator != null) {
-                               currentCompositeTransform = transform;
-                               if (transform instanceof CoGroupByKey && 
node.getInput().expand().size() != 2) {
-                                       // we can only optimize CoGroupByKey 
for input size 2
-                                       currentCompositeTransform = null;
-                               }
-                       }
-               }
-               this.depth++;
-       }
-
-       @Override
-       public void leaveCompositeTransform(TransformTreeNode node) {
-               PTransform<?, ?> transform = node.getTransform();
-               if (transform != null && currentCompositeTransform == 
transform) {
-
-                       BatchTransformTranslator<?> translator = 
FlinkBatchTransformTranslators.getTranslator(transform);
-                       if (translator != null) {
-                               System.out.println(genSpaces(this.depth) + 
"doingCompositeTransform- " + formatNodeName(node));
-                               applyBatchTransform(transform, node, 
translator);
-                               currentCompositeTransform = null;
-                       } else {
-                               throw new IllegalStateException("Attempted to 
translate composite transform " +
-                                               "but no translator was found: " 
+ currentCompositeTransform);
-                       }
-               }
-               this.depth--;
-               System.out.println(genSpaces(this.depth) + 
"leaveCompositeTransform- " + formatNodeName(node));
-       }
-
-       @Override
-       public void visitTransform(TransformTreeNode node) {
-               System.out.println(genSpaces(this.depth) + "visitTransform- " + 
formatNodeName(node));
-               if (currentCompositeTransform != null) {
-                       // ignore it
-                       return;
-               }
-
-               // get the transformation corresponding to hte node we are
-               // currently visiting and translate it into its Flink 
alternative.
-
-               PTransform<?, ?> transform = node.getTransform();
-               BatchTransformTranslator<?> translator = 
FlinkBatchTransformTranslators.getTranslator(transform);
-               if (translator == null) {
-                       System.out.println(node.getTransform().getClass());
-                       throw new UnsupportedOperationException("The transform 
" + transform + " is currently not supported.");
-               }
-               applyBatchTransform(transform, node, translator);
-       }
-
-       @Override
-       public void visitValue(PValue value, TransformTreeNode producer) {
-               // do nothing here
-       }
-
-       private <T extends PTransform<?, ?>> void 
applyBatchTransform(PTransform<?, ?> transform, TransformTreeNode node, 
BatchTransformTranslator<?> translator) {
-
-               @SuppressWarnings("unchecked")
-               T typedTransform = (T) transform;
-
-               @SuppressWarnings("unchecked")
-               BatchTransformTranslator<T> typedTranslator = 
(BatchTransformTranslator<T>) translator;
-
-               // create the applied PTransform on the batchContext
-               batchContext.setCurrentTransform(AppliedPTransform.of(
-                               node.getFullName(), node.getInput(), 
node.getOutput(), (PTransform) transform));
-               typedTranslator.translateNode(typedTransform, batchContext);
-       }
-
-       /**
-        * A translator of a {@link PTransform}.
-        */
-       public interface BatchTransformTranslator<Type extends PTransform> {
-               void translateNode(Type transform, FlinkBatchTranslationContext 
context);
-       }
-
-       private static String genSpaces(int n) {
-               String s = "";
-               for (int i = 0; i < n; i++) {
-                       s += "|   ";
-               }
-               return s;
-       }
-
-       private static String formatNodeName(TransformTreeNode node) {
-               return node.toString().split("@")[1] + node.getTransform();
-       }
+  /**
+   * The necessary context in the case of a batch job.
+   */
+  private final FlinkBatchTranslationContext batchContext;
+
+  private int depth = 0;
+
+  /**
+   * Composite transform that we want to translate before proceeding with 
other transforms.
+   */
+  private PTransform<?, ?> currentCompositeTransform;
+
+  public FlinkBatchPipelineTranslator(ExecutionEnvironment env, 
PipelineOptions options) {
+    this.batchContext = new FlinkBatchTranslationContext(env, options);
+  }
+
+  // 
--------------------------------------------------------------------------------------------
+  //  Pipeline Visitor Methods
+  // 
--------------------------------------------------------------------------------------------
+
+  @Override
+  public void enterCompositeTransform(TransformTreeNode node) {
+    System.out.println(genSpaces(this.depth) + "enterCompositeTransform- " + 
formatNodeName(node));
+
+    PTransform<?, ?> transform = node.getTransform();
+    if (transform != null && currentCompositeTransform == null) {
+
+      BatchTransformTranslator<?> translator = 
FlinkBatchTransformTranslators.getTranslator(transform);
+      if (translator != null) {
+        currentCompositeTransform = transform;
+        if (transform instanceof CoGroupByKey && 
node.getInput().expand().size() != 2) {
+          // we can only optimize CoGroupByKey for input size 2
+          currentCompositeTransform = null;
+        }
+      }
+    }
+    this.depth++;
+  }
+
+  @Override
+  public void leaveCompositeTransform(TransformTreeNode node) {
+    PTransform<?, ?> transform = node.getTransform();
+    if (transform != null && currentCompositeTransform == transform) {
+
+      BatchTransformTranslator<?> translator = 
FlinkBatchTransformTranslators.getTranslator(transform);
+      if (translator != null) {
+        System.out.println(genSpaces(this.depth) + "doingCompositeTransform- " 
+ formatNodeName(node));
+        applyBatchTransform(transform, node, translator);
+        currentCompositeTransform = null;
+      } else {
+        throw new IllegalStateException("Attempted to translate composite 
transform " +
+            "but no translator was found: " + currentCompositeTransform);
+      }
+    }
+    this.depth--;
+    System.out.println(genSpaces(this.depth) + "leaveCompositeTransform- " + 
formatNodeName(node));
+  }
+
+  @Override
+  public void visitTransform(TransformTreeNode node) {
+    System.out.println(genSpaces(this.depth) + "visitTransform- " + 
formatNodeName(node));
+    if (currentCompositeTransform != null) {
+      // ignore it
+      return;
+    }
+
+    // get the transformation corresponding to hte node we are
+    // currently visiting and translate it into its Flink alternative.
+
+    PTransform<?, ?> transform = node.getTransform();
+    BatchTransformTranslator<?> translator = 
FlinkBatchTransformTranslators.getTranslator(transform);
+    if (translator == null) {
+      System.out.println(node.getTransform().getClass());
+      throw new UnsupportedOperationException("The transform " + transform + " 
is currently not supported.");
+    }
+    applyBatchTransform(transform, node, translator);
+  }
+
+  @Override
+  public void visitValue(PValue value, TransformTreeNode producer) {
+    // do nothing here
+  }
+
+  private <T extends PTransform<?, ?>> void applyBatchTransform(PTransform<?, 
?> transform, TransformTreeNode node, BatchTransformTranslator<?> translator) {
+
+    @SuppressWarnings("unchecked")
+    T typedTransform = (T) transform;
+
+    @SuppressWarnings("unchecked")
+    BatchTransformTranslator<T> typedTranslator = 
(BatchTransformTranslator<T>) translator;
+
+    // create the applied PTransform on the batchContext
+    batchContext.setCurrentTransform(AppliedPTransform.of(
+        node.getFullName(), node.getInput(), node.getOutput(), (PTransform) 
transform));
+    typedTranslator.translateNode(typedTransform, batchContext);
+  }
+
+  /**
+   * A translator of a {@link PTransform}.
+   */
+  public interface BatchTransformTranslator<Type extends PTransform> {
+    void translateNode(Type transform, FlinkBatchTranslationContext context);
+  }
+
+  private static String genSpaces(int n) {
+    String s = "";
+    for (int i = 0; i < n; i++) {
+      s += "|   ";
+    }
+    return s;
+  }
+
+  private static String formatNodeName(TransformTreeNode node) {
+    return node.toString().split("@")[1] + node.getTransform();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java
index 0e45a21..6a8409c 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java
@@ -93,502 +93,502 @@ import java.util.Map;
  */
 public class FlinkBatchTransformTranslators {
 
-       // 
--------------------------------------------------------------------------------------------
-       //  Transform Translator Registry
-       // 
--------------------------------------------------------------------------------------------
-       
-       @SuppressWarnings("rawtypes")
-       private static final Map<Class<? extends PTransform>, 
FlinkBatchPipelineTranslator.BatchTransformTranslator> TRANSLATORS = new 
HashMap<>();
+  // 
--------------------------------------------------------------------------------------------
+  //  Transform Translator Registry
+  // 
--------------------------------------------------------------------------------------------
+  
+  @SuppressWarnings("rawtypes")
+  private static final Map<Class<? extends PTransform>, 
FlinkBatchPipelineTranslator.BatchTransformTranslator> TRANSLATORS = new 
HashMap<>();
 
-       // register the known translators
-       static {
-               TRANSLATORS.put(View.CreatePCollectionView.class, new 
CreatePCollectionViewTranslatorBatch());
+  // register the known translators
+  static {
+    TRANSLATORS.put(View.CreatePCollectionView.class, new 
CreatePCollectionViewTranslatorBatch());
 
-               TRANSLATORS.put(Combine.PerKey.class, new 
CombinePerKeyTranslatorBatch());
-               // we don't need this because we translate the Combine.PerKey 
directly
-               //TRANSLATORS.put(Combine.GroupedValues.class, new 
CombineGroupedValuesTranslator());
+    TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslatorBatch());
+    // we don't need this because we translate the Combine.PerKey directly
+    //TRANSLATORS.put(Combine.GroupedValues.class, new 
CombineGroupedValuesTranslator());
 
-               TRANSLATORS.put(Create.Values.class, new 
CreateTranslatorBatch());
+    TRANSLATORS.put(Create.Values.class, new CreateTranslatorBatch());
 
-               TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new 
FlattenPCollectionTranslatorBatch());
+    TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new 
FlattenPCollectionTranslatorBatch());
 
-               TRANSLATORS.put(GroupByKey.GroupByKeyOnly.class, new 
GroupByKeyOnlyTranslatorBatch());
-               // TODO we're currently ignoring windows here but that has to 
change in the future
-               TRANSLATORS.put(GroupByKey.class, new 
GroupByKeyTranslatorBatch());
-
-               TRANSLATORS.put(ParDo.BoundMulti.class, new 
ParDoBoundMultiTranslatorBatch());
-               TRANSLATORS.put(ParDo.Bound.class, new 
ParDoBoundTranslatorBatch());
-
-               TRANSLATORS.put(CoGroupByKey.class, new 
CoGroupByKeyTranslatorBatch());
-
-               TRANSLATORS.put(AvroIO.Read.Bound.class, new 
AvroIOReadTranslatorBatch());
-               TRANSLATORS.put(AvroIO.Write.Bound.class, new 
AvroIOWriteTranslatorBatch());
-
-               TRANSLATORS.put(Read.Bounded.class, new 
ReadSourceTranslatorBatch());
-               TRANSLATORS.put(Write.Bound.class, new 
WriteSinkTranslatorBatch());
-
-               TRANSLATORS.put(TextIO.Read.Bound.class, new 
TextIOReadTranslatorBatch());
-               TRANSLATORS.put(TextIO.Write.Bound.class, new 
TextIOWriteTranslatorBatch());
-
-               // Flink-specific
-               TRANSLATORS.put(ConsoleIO.Write.Bound.class, new 
ConsoleIOWriteTranslatorBatch());
-
-       }
-
-
-       public static FlinkBatchPipelineTranslator.BatchTransformTranslator<?> 
getTranslator(PTransform<?, ?> transform) {
-               return TRANSLATORS.get(transform.getClass());
-       }
-
-       private static class ReadSourceTranslatorBatch<T> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<Read.Bounded<T>> {
-
-               @Override
-               public void translateNode(Read.Bounded<T> transform, 
FlinkBatchTranslationContext context) {
-                       String name = transform.getName();
-                       BoundedSource<T> source = transform.getSource();
-                       PCollection<T> output = context.getOutput(transform);
-                       Coder<T> coder = output.getCoder();
-
-                       TypeInformation<T> typeInformation = 
context.getTypeInfo(output);
-
-                       DataSource<T> dataSource = new 
DataSource<>(context.getExecutionEnvironment(),
-                                       new SourceInputFormat<>(source, 
context.getPipelineOptions()), typeInformation, name);
-
-                       context.setOutputDataSet(output, dataSource);
-               }
-       }
-
-       private static class AvroIOReadTranslatorBatch<T> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<AvroIO.Read.Bound<T>> {
-               private static final Logger LOG = 
LoggerFactory.getLogger(AvroIOReadTranslatorBatch.class);
-
-               @Override
-               public void translateNode(AvroIO.Read.Bound<T> transform, 
FlinkBatchTranslationContext context) {
-                       String path = transform.getFilepattern();
-                       String name = transform.getName();
-//                     Schema schema = transform.getSchema();
-                       PValue output = context.getOutput(transform);
-
-                       TypeInformation<T> typeInformation = 
context.getTypeInfo(output);
-
-                       // This is super hacky, but unfortunately we cannot get 
the type otherwise
-                       Class<T> extractedAvroType;
-                       try {
-                               Field typeField = 
transform.getClass().getDeclaredField("type");
-                               typeField.setAccessible(true);
-                               @SuppressWarnings("unchecked")
-                               Class<T> avroType = (Class<T>) 
typeField.get(transform);
-                               extractedAvroType = avroType;
-                       } catch (NoSuchFieldException | IllegalAccessException 
e) {
-                               // we know that the field is there and it is 
accessible
-                               throw new RuntimeException("Could not access 
type from AvroIO.Bound", e);
-                       }
-
-                       DataSource<T> source = new 
DataSource<>(context.getExecutionEnvironment(),
-                                       new AvroInputFormat<>(new Path(path), 
extractedAvroType),
-                                       typeInformation, name);
-
-                       context.setOutputDataSet(output, source);
-               }
-       }
-
-       private static class AvroIOWriteTranslatorBatch<T> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<AvroIO.Write.Bound<T>> {
-               private static final Logger LOG = 
LoggerFactory.getLogger(AvroIOWriteTranslatorBatch.class);
-
-               @Override
-               public void translateNode(AvroIO.Write.Bound<T> transform, 
FlinkBatchTranslationContext context) {
-                       DataSet<T> inputDataSet = 
context.getInputDataSet(context.getInput(transform));
-                       String filenamePrefix = transform.getFilenamePrefix();
-                       String filenameSuffix = transform.getFilenameSuffix();
-                       int numShards = transform.getNumShards();
-                       String shardNameTemplate = 
transform.getShardNameTemplate();
-
-                       // TODO: Implement these. We need Flink support for 
this.
-                       LOG.warn("Translation of TextIO.Write.filenameSuffix 
not yet supported. Is: {}.",
-                                       filenameSuffix);
-                       LOG.warn("Translation of TextIO.Write.shardNameTemplate 
not yet supported. Is: {}.", shardNameTemplate);
-
-                       // This is super hacky, but unfortunately we cannot get 
the type otherwise
-                       Class<T> extractedAvroType;
-                       try {
-                               Field typeField = 
transform.getClass().getDeclaredField("type");
-                               typeField.setAccessible(true);
-                               @SuppressWarnings("unchecked")
-                               Class<T> avroType = (Class<T>) 
typeField.get(transform);
-                               extractedAvroType = avroType;
-                       } catch (NoSuchFieldException | IllegalAccessException 
e) {
-                               // we know that the field is there and it is 
accessible
-                               throw new RuntimeException("Could not access 
type from AvroIO.Bound", e);
-                       }
-
-                       DataSink<T> dataSink = inputDataSet.output(new 
AvroOutputFormat<>(new Path
-                                       (filenamePrefix), extractedAvroType));
-
-                       if (numShards > 0) {
-                               dataSink.setParallelism(numShards);
-                       }
-               }
-       }
-
-       private static class TextIOReadTranslatorBatch implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<TextIO.Read.Bound<String>>
 {
-               private static final Logger LOG = 
LoggerFactory.getLogger(TextIOReadTranslatorBatch.class);
-
-               @Override
-               public void translateNode(TextIO.Read.Bound<String> transform, 
FlinkBatchTranslationContext context) {
-                       String path = transform.getFilepattern();
-                       String name = transform.getName();
-
-                       TextIO.CompressionType compressionType = 
transform.getCompressionType();
-                       boolean needsValidation = transform.needsValidation();
-
-                       // TODO: Implement these. We need Flink support for 
this.
-                       LOG.warn("Translation of TextIO.CompressionType not yet 
supported. Is: {}.", compressionType);
-                       LOG.warn("Translation of TextIO.Read.needsValidation 
not yet supported. Is: {}.", needsValidation);
-
-                       PValue output = context.getOutput(transform);
-
-                       TypeInformation<String> typeInformation = 
context.getTypeInfo(output);
-                       DataSource<String> source = new 
DataSource<>(context.getExecutionEnvironment(), new TextInputFormat(new 
Path(path)), typeInformation, name);
-
-                       context.setOutputDataSet(output, source);
-               }
-       }
-
-       private static class TextIOWriteTranslatorBatch<T> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<TextIO.Write.Bound<T>> {
-               private static final Logger LOG = 
LoggerFactory.getLogger(TextIOWriteTranslatorBatch.class);
-
-               @Override
-               public void translateNode(TextIO.Write.Bound<T> transform, 
FlinkBatchTranslationContext context) {
-                       PValue input = context.getInput(transform);
-                       DataSet<T> inputDataSet = 
context.getInputDataSet(input);
-
-                       String filenamePrefix = transform.getFilenamePrefix();
-                       String filenameSuffix = transform.getFilenameSuffix();
-                       boolean needsValidation = transform.needsValidation();
-                       int numShards = transform.getNumShards();
-                       String shardNameTemplate = 
transform.getShardNameTemplate();
-
-                       // TODO: Implement these. We need Flink support for 
this.
-                       LOG.warn("Translation of TextIO.Write.needsValidation 
not yet supported. Is: {}.", needsValidation);
-                       LOG.warn("Translation of TextIO.Write.filenameSuffix 
not yet supported. Is: {}.", filenameSuffix);
-                       LOG.warn("Translation of TextIO.Write.shardNameTemplate 
not yet supported. Is: {}.", shardNameTemplate);
-
-                       //inputDataSet.print();
-                       DataSink<T> dataSink = 
inputDataSet.writeAsText(filenamePrefix);
-
-                       if (numShards > 0) {
-                               dataSink.setParallelism(numShards);
-                       }
-               }
-       }
-
-       private static class ConsoleIOWriteTranslatorBatch implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<ConsoleIO.Write.Bound> {
-               @Override
-               public void translateNode(ConsoleIO.Write.Bound transform, 
FlinkBatchTranslationContext context) {
-                       PValue input = context.getInput(transform);
-                       DataSet<?> inputDataSet = 
context.getInputDataSet(input);
-                       inputDataSet.printOnTaskManager(transform.getName());
-               }
-       }
+    TRANSLATORS.put(GroupByKey.GroupByKeyOnly.class, new 
GroupByKeyOnlyTranslatorBatch());
+    // TODO we're currently ignoring windows here but that has to change in 
the future
+    TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch());
+
+    TRANSLATORS.put(ParDo.BoundMulti.class, new 
ParDoBoundMultiTranslatorBatch());
+    TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundTranslatorBatch());
+
+    TRANSLATORS.put(CoGroupByKey.class, new CoGroupByKeyTranslatorBatch());
+
+    TRANSLATORS.put(AvroIO.Read.Bound.class, new AvroIOReadTranslatorBatch());
+    TRANSLATORS.put(AvroIO.Write.Bound.class, new 
AvroIOWriteTranslatorBatch());
+
+    TRANSLATORS.put(Read.Bounded.class, new ReadSourceTranslatorBatch());
+    TRANSLATORS.put(Write.Bound.class, new WriteSinkTranslatorBatch());
+
+    TRANSLATORS.put(TextIO.Read.Bound.class, new TextIOReadTranslatorBatch());
+    TRANSLATORS.put(TextIO.Write.Bound.class, new 
TextIOWriteTranslatorBatch());
+
+    // Flink-specific
+    TRANSLATORS.put(ConsoleIO.Write.Bound.class, new 
ConsoleIOWriteTranslatorBatch());
+
+  }
+
+
+  public static FlinkBatchPipelineTranslator.BatchTransformTranslator<?> 
getTranslator(PTransform<?, ?> transform) {
+    return TRANSLATORS.get(transform.getClass());
+  }
+
+  private static class ReadSourceTranslatorBatch<T> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<Read.Bounded<T>> {
+
+    @Override
+    public void translateNode(Read.Bounded<T> transform, 
FlinkBatchTranslationContext context) {
+      String name = transform.getName();
+      BoundedSource<T> source = transform.getSource();
+      PCollection<T> output = context.getOutput(transform);
+      Coder<T> coder = output.getCoder();
+
+      TypeInformation<T> typeInformation = context.getTypeInfo(output);
+
+      DataSource<T> dataSource = new 
DataSource<>(context.getExecutionEnvironment(),
+          new SourceInputFormat<>(source, context.getPipelineOptions()), 
typeInformation, name);
+
+      context.setOutputDataSet(output, dataSource);
+    }
+  }
+
+  private static class AvroIOReadTranslatorBatch<T> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<AvroIO.Read.Bound<T>> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AvroIOReadTranslatorBatch.class);
+
+    @Override
+    public void translateNode(AvroIO.Read.Bound<T> transform, 
FlinkBatchTranslationContext context) {
+      String path = transform.getFilepattern();
+      String name = transform.getName();
+//      Schema schema = transform.getSchema();
+      PValue output = context.getOutput(transform);
+
+      TypeInformation<T> typeInformation = context.getTypeInfo(output);
+
+      // This is super hacky, but unfortunately we cannot get the type 
otherwise
+      Class<T> extractedAvroType;
+      try {
+        Field typeField = transform.getClass().getDeclaredField("type");
+        typeField.setAccessible(true);
+        @SuppressWarnings("unchecked")
+        Class<T> avroType = (Class<T>) typeField.get(transform);
+        extractedAvroType = avroType;
+      } catch (NoSuchFieldException | IllegalAccessException e) {
+        // we know that the field is there and it is accessible
+        throw new RuntimeException("Could not access type from AvroIO.Bound", 
e);
+      }
+
+      DataSource<T> source = new 
DataSource<>(context.getExecutionEnvironment(),
+          new AvroInputFormat<>(new Path(path), extractedAvroType),
+          typeInformation, name);
+
+      context.setOutputDataSet(output, source);
+    }
+  }
+
+  private static class AvroIOWriteTranslatorBatch<T> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<AvroIO.Write.Bound<T>> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AvroIOWriteTranslatorBatch.class);
+
+    @Override
+    public void translateNode(AvroIO.Write.Bound<T> transform, 
FlinkBatchTranslationContext context) {
+      DataSet<T> inputDataSet = 
context.getInputDataSet(context.getInput(transform));
+      String filenamePrefix = transform.getFilenamePrefix();
+      String filenameSuffix = transform.getFilenameSuffix();
+      int numShards = transform.getNumShards();
+      String shardNameTemplate = transform.getShardNameTemplate();
+
+      // TODO: Implement these. We need Flink support for this.
+      LOG.warn("Translation of TextIO.Write.filenameSuffix not yet supported. 
Is: {}.",
+          filenameSuffix);
+      LOG.warn("Translation of TextIO.Write.shardNameTemplate not yet 
supported. Is: {}.", shardNameTemplate);
+
+      // This is super hacky, but unfortunately we cannot get the type 
otherwise
+      Class<T> extractedAvroType;
+      try {
+        Field typeField = transform.getClass().getDeclaredField("type");
+        typeField.setAccessible(true);
+        @SuppressWarnings("unchecked")
+        Class<T> avroType = (Class<T>) typeField.get(transform);
+        extractedAvroType = avroType;
+      } catch (NoSuchFieldException | IllegalAccessException e) {
+        // we know that the field is there and it is accessible
+        throw new RuntimeException("Could not access type from AvroIO.Bound", 
e);
+      }
+
+      DataSink<T> dataSink = inputDataSet.output(new AvroOutputFormat<>(new 
Path
+          (filenamePrefix), extractedAvroType));
+
+      if (numShards > 0) {
+        dataSink.setParallelism(numShards);
+      }
+    }
+  }
+
+  private static class TextIOReadTranslatorBatch implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<TextIO.Read.Bound<String>>
 {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TextIOReadTranslatorBatch.class);
+
+    @Override
+    public void translateNode(TextIO.Read.Bound<String> transform, 
FlinkBatchTranslationContext context) {
+      String path = transform.getFilepattern();
+      String name = transform.getName();
+
+      TextIO.CompressionType compressionType = transform.getCompressionType();
+      boolean needsValidation = transform.needsValidation();
+
+      // TODO: Implement these. We need Flink support for this.
+      LOG.warn("Translation of TextIO.CompressionType not yet supported. Is: 
{}.", compressionType);
+      LOG.warn("Translation of TextIO.Read.needsValidation not yet supported. 
Is: {}.", needsValidation);
+
+      PValue output = context.getOutput(transform);
+
+      TypeInformation<String> typeInformation = context.getTypeInfo(output);
+      DataSource<String> source = new 
DataSource<>(context.getExecutionEnvironment(), new TextInputFormat(new 
Path(path)), typeInformation, name);
+
+      context.setOutputDataSet(output, source);
+    }
+  }
+
+  private static class TextIOWriteTranslatorBatch<T> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<TextIO.Write.Bound<T>> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TextIOWriteTranslatorBatch.class);
+
+    @Override
+    public void translateNode(TextIO.Write.Bound<T> transform, 
FlinkBatchTranslationContext context) {
+      PValue input = context.getInput(transform);
+      DataSet<T> inputDataSet = context.getInputDataSet(input);
+
+      String filenamePrefix = transform.getFilenamePrefix();
+      String filenameSuffix = transform.getFilenameSuffix();
+      boolean needsValidation = transform.needsValidation();
+      int numShards = transform.getNumShards();
+      String shardNameTemplate = transform.getShardNameTemplate();
+
+      // TODO: Implement these. We need Flink support for this.
+      LOG.warn("Translation of TextIO.Write.needsValidation not yet supported. 
Is: {}.", needsValidation);
+      LOG.warn("Translation of TextIO.Write.filenameSuffix not yet supported. 
Is: {}.", filenameSuffix);
+      LOG.warn("Translation of TextIO.Write.shardNameTemplate not yet 
supported. Is: {}.", shardNameTemplate);
+
+      //inputDataSet.print();
+      DataSink<T> dataSink = inputDataSet.writeAsText(filenamePrefix);
+
+      if (numShards > 0) {
+        dataSink.setParallelism(numShards);
+      }
+    }
+  }
+
+  private static class ConsoleIOWriteTranslatorBatch implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<ConsoleIO.Write.Bound> {
+    @Override
+    public void translateNode(ConsoleIO.Write.Bound transform, 
FlinkBatchTranslationContext context) {
+      PValue input = context.getInput(transform);
+      DataSet<?> inputDataSet = context.getInputDataSet(input);
+      inputDataSet.printOnTaskManager(transform.getName());
+    }
+  }
 
-       private static class WriteSinkTranslatorBatch<T> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<Write.Bound<T>> {
-
-               @Override
-               public void translateNode(Write.Bound<T> transform, 
FlinkBatchTranslationContext context) {
-                       String name = transform.getName();
-                       PValue input = context.getInput(transform);
-                       DataSet<T> inputDataSet = 
context.getInputDataSet(input);
+  private static class WriteSinkTranslatorBatch<T> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<Write.Bound<T>> {
+
+    @Override
+    public void translateNode(Write.Bound<T> transform, 
FlinkBatchTranslationContext context) {
+      String name = transform.getName();
+      PValue input = context.getInput(transform);
+      DataSet<T> inputDataSet = context.getInputDataSet(input);
 
-                       inputDataSet.output(new SinkOutputFormat<>(transform, 
context.getPipelineOptions())).name(name);
-               }
-       }
+      inputDataSet.output(new SinkOutputFormat<>(transform, 
context.getPipelineOptions())).name(name);
+    }
+  }
 
-       private static class GroupByKeyOnlyTranslatorBatch<K, V> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey.GroupByKeyOnly<K,
 V>> {
+  private static class GroupByKeyOnlyTranslatorBatch<K, V> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey.GroupByKeyOnly<K,
 V>> {
 
-               @Override
-               public void translateNode(GroupByKey.GroupByKeyOnly<K, V> 
transform, FlinkBatchTranslationContext context) {
-                       DataSet<KV<K, V>> inputDataSet = 
context.getInputDataSet(context.getInput(transform));
-                       GroupReduceFunction<KV<K, V>, KV<K, Iterable<V>>> 
groupReduceFunction = new FlinkKeyedListAggregationFunction<>();
+    @Override
+    public void translateNode(GroupByKey.GroupByKeyOnly<K, V> transform, 
FlinkBatchTranslationContext context) {
+      DataSet<KV<K, V>> inputDataSet = 
context.getInputDataSet(context.getInput(transform));
+      GroupReduceFunction<KV<K, V>, KV<K, Iterable<V>>> groupReduceFunction = 
new FlinkKeyedListAggregationFunction<>();
 
-                       TypeInformation<KV<K, Iterable<V>>> typeInformation = 
context.getTypeInfo(context.getOutput(transform));
+      TypeInformation<KV<K, Iterable<V>>> typeInformation = 
context.getTypeInfo(context.getOutput(transform));
 
-                       Grouping<KV<K, V>> grouping = new 
UnsortedGrouping<>(inputDataSet, new Keys.ExpressionKeys<>(new String[]{"key"}, 
inputDataSet.getType()));
+      Grouping<KV<K, V>> grouping = new UnsortedGrouping<>(inputDataSet, new 
Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet.getType()));
 
-                       GroupReduceOperator<KV<K, V>, KV<K, Iterable<V>>> 
outputDataSet =
-                                       new GroupReduceOperator<>(grouping, 
typeInformation, groupReduceFunction, transform.getName());
-                       context.setOutputDataSet(context.getOutput(transform), 
outputDataSet);
-               }
-       }
+      GroupReduceOperator<KV<K, V>, KV<K, Iterable<V>>> outputDataSet =
+          new GroupReduceOperator<>(grouping, typeInformation, 
groupReduceFunction, transform.getName());
+      context.setOutputDataSet(context.getOutput(transform), outputDataSet);
+    }
+  }
 
-       /**
-        * Translates a GroupByKey while ignoring window assignments. This is 
identical to the {@link GroupByKeyOnlyTranslatorBatch}
-        */
-       private static class GroupByKeyTranslatorBatch<K, V> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey<K, V>> {
+  /**
+   * Translates a GroupByKey while ignoring window assignments. This is 
identical to the {@link GroupByKeyOnlyTranslatorBatch}
+   */
+  private static class GroupByKeyTranslatorBatch<K, V> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey<K, V>> {
 
-               @Override
-               public void translateNode(GroupByKey<K, V> transform, 
FlinkBatchTranslationContext context) {
-                       DataSet<KV<K, V>> inputDataSet = 
context.getInputDataSet(context.getInput(transform));
-                       GroupReduceFunction<KV<K, V>, KV<K, Iterable<V>>> 
groupReduceFunction = new FlinkKeyedListAggregationFunction<>();
+    @Override
+    public void translateNode(GroupByKey<K, V> transform, 
FlinkBatchTranslationContext context) {
+      DataSet<KV<K, V>> inputDataSet = 
context.getInputDataSet(context.getInput(transform));
+      GroupReduceFunction<KV<K, V>, KV<K, Iterable<V>>> groupReduceFunction = 
new FlinkKeyedListAggregationFunction<>();
 
-                       TypeInformation<KV<K, Iterable<V>>> typeInformation = 
context.getTypeInfo(context.getOutput(transform));
+      TypeInformation<KV<K, Iterable<V>>> typeInformation = 
context.getTypeInfo(context.getOutput(transform));
 
-                       Grouping<KV<K, V>> grouping = new 
UnsortedGrouping<>(inputDataSet, new Keys.ExpressionKeys<>(new String[]{"key"}, 
inputDataSet.getType()));
+      Grouping<KV<K, V>> grouping = new UnsortedGrouping<>(inputDataSet, new 
Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet.getType()));
 
-                       GroupReduceOperator<KV<K, V>, KV<K, Iterable<V>>> 
outputDataSet =
-                                       new GroupReduceOperator<>(grouping, 
typeInformation, groupReduceFunction, transform.getName());
+      GroupReduceOperator<KV<K, V>, KV<K, Iterable<V>>> outputDataSet =
+          new GroupReduceOperator<>(grouping, typeInformation, 
groupReduceFunction, transform.getName());
 
-                       context.setOutputDataSet(context.getOutput(transform), 
outputDataSet);
-               }
-       }
+      context.setOutputDataSet(context.getOutput(transform), outputDataSet);
+    }
+  }
 
-       private static class CombinePerKeyTranslatorBatch<K, VI, VA, VO> 
implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<Combine.PerKey<K, VI, 
VO>> {
+  private static class CombinePerKeyTranslatorBatch<K, VI, VA, VO> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<Combine.PerKey<K, VI, 
VO>> {
 
-               @Override
-               public void translateNode(Combine.PerKey<K, VI, VO> transform, 
FlinkBatchTranslationContext context) {
-                       DataSet<KV<K, VI>> inputDataSet = 
context.getInputDataSet(context.getInput(transform));
+    @Override
+    public void translateNode(Combine.PerKey<K, VI, VO> transform, 
FlinkBatchTranslationContext context) {
+      DataSet<KV<K, VI>> inputDataSet = 
context.getInputDataSet(context.getInput(transform));
 
-                       @SuppressWarnings("unchecked")
-                       Combine.KeyedCombineFn<K, VI, VA, VO> keyedCombineFn = 
(Combine.KeyedCombineFn<K, VI, VA, VO>) transform.getFn();
+      @SuppressWarnings("unchecked")
+      Combine.KeyedCombineFn<K, VI, VA, VO> keyedCombineFn = 
(Combine.KeyedCombineFn<K, VI, VA, VO>) transform.getFn();
 
-                       KvCoder<K, VI> inputCoder = (KvCoder<K, VI>) 
context.getInput(transform).getCoder();
+      KvCoder<K, VI> inputCoder = (KvCoder<K, VI>) 
context.getInput(transform).getCoder();
 
-                       Coder<VA> accumulatorCoder =
-                                       null;
-                       try {
-                               accumulatorCoder = 
keyedCombineFn.getAccumulatorCoder(context.getInput(transform).getPipeline().getCoderRegistry(),
 inputCoder.getKeyCoder(), inputCoder.getValueCoder());
-                       } catch (CannotProvideCoderException e) {
-                               e.printStackTrace();
-                               // TODO
-                       }
+      Coder<VA> accumulatorCoder =
+          null;
+      try {
+        accumulatorCoder = 
keyedCombineFn.getAccumulatorCoder(context.getInput(transform).getPipeline().getCoderRegistry(),
 inputCoder.getKeyCoder(), inputCoder.getValueCoder());
+      } catch (CannotProvideCoderException e) {
+        e.printStackTrace();
+        // TODO
+      }
 
-                       TypeInformation<KV<K, VI>> kvCoderTypeInformation = new 
KvCoderTypeInformation<>(inputCoder);
-                       TypeInformation<KV<K, VA>> partialReduceTypeInfo = new 
KvCoderTypeInformation<>(KvCoder.of(inputCoder.getKeyCoder(), 
accumulatorCoder));
+      TypeInformation<KV<K, VI>> kvCoderTypeInformation = new 
KvCoderTypeInformation<>(inputCoder);
+      TypeInformation<KV<K, VA>> partialReduceTypeInfo = new 
KvCoderTypeInformation<>(KvCoder.of(inputCoder.getKeyCoder(), 
accumulatorCoder));
 
-                       Grouping<KV<K, VI>> inputGrouping = new 
UnsortedGrouping<>(inputDataSet, new Keys.ExpressionKeys<>(new String[]{"key"}, 
kvCoderTypeInformation));
+      Grouping<KV<K, VI>> inputGrouping = new UnsortedGrouping<>(inputDataSet, 
new Keys.ExpressionKeys<>(new String[]{"key"}, kvCoderTypeInformation));
 
-                       FlinkPartialReduceFunction<K, VI, VA> 
partialReduceFunction = new FlinkPartialReduceFunction<>(keyedCombineFn);
+      FlinkPartialReduceFunction<K, VI, VA> partialReduceFunction = new 
FlinkPartialReduceFunction<>(keyedCombineFn);
 
-                       // Partially GroupReduce the values into the 
intermediate format VA (combine)
-                       GroupCombineOperator<KV<K, VI>, KV<K, VA>> groupCombine 
=
-                                       new 
GroupCombineOperator<>(inputGrouping, partialReduceTypeInfo, 
partialReduceFunction,
-                                                       "GroupCombine: " + 
transform.getName());
+      // Partially GroupReduce the values into the intermediate format VA 
(combine)
+      GroupCombineOperator<KV<K, VI>, KV<K, VA>> groupCombine =
+          new GroupCombineOperator<>(inputGrouping, partialReduceTypeInfo, 
partialReduceFunction,
+              "GroupCombine: " + transform.getName());
 
-                       // Reduce fully to VO
-                       GroupReduceFunction<KV<K, VA>, KV<K, VO>> 
reduceFunction = new FlinkReduceFunction<>(keyedCombineFn);
+      // Reduce fully to VO
+      GroupReduceFunction<KV<K, VA>, KV<K, VO>> reduceFunction = new 
FlinkReduceFunction<>(keyedCombineFn);
 
-                       TypeInformation<KV<K, VO>> reduceTypeInfo = 
context.getTypeInfo(context.getOutput(transform));
+      TypeInformation<KV<K, VO>> reduceTypeInfo = 
context.getTypeInfo(context.getOutput(transform));
 
-                       Grouping<KV<K, VA>> intermediateGrouping = new 
UnsortedGrouping<>(groupCombine, new Keys.ExpressionKeys<>(new String[]{"key"}, 
groupCombine.getType()));
+      Grouping<KV<K, VA>> intermediateGrouping = new 
UnsortedGrouping<>(groupCombine, new Keys.ExpressionKeys<>(new String[]{"key"}, 
groupCombine.getType()));
 
-                       // Fully reduce the values and create output format VO
-                       GroupReduceOperator<KV<K, VA>, KV<K, VO>> outputDataSet 
=
-                                       new 
GroupReduceOperator<>(intermediateGrouping, reduceTypeInfo, reduceFunction, 
transform.getName());
+      // Fully reduce the values and create output format VO
+      GroupReduceOperator<KV<K, VA>, KV<K, VO>> outputDataSet =
+          new GroupReduceOperator<>(intermediateGrouping, reduceTypeInfo, 
reduceFunction, transform.getName());
 
-                       context.setOutputDataSet(context.getOutput(transform), 
outputDataSet);
-               }
-       }
+      context.setOutputDataSet(context.getOutput(transform), outputDataSet);
+    }
+  }
 
-//     private static class CombineGroupedValuesTranslator<K, VI, VO> 
implements FlinkPipelineTranslator.TransformTranslator<Combine.GroupedValues<K, 
VI, VO>> {
+//  private static class CombineGroupedValuesTranslator<K, VI, VO> implements 
FlinkPipelineTranslator.TransformTranslator<Combine.GroupedValues<K, VI, VO>> {
 //
-//             @Override
-//             public void translateNode(Combine.GroupedValues<K, VI, VO> 
transform, TranslationContext context) {
-//                     DataSet<KV<K, VI>> inputDataSet = 
context.getInputDataSet(transform.getInput());
+//    @Override
+//    public void translateNode(Combine.GroupedValues<K, VI, VO> transform, 
TranslationContext context) {
+//      DataSet<KV<K, VI>> inputDataSet = 
context.getInputDataSet(transform.getInput());
 //
-//                     Combine.KeyedCombineFn<? super K, ? super VI, ?, VO> 
keyedCombineFn = transform.getFn();
+//      Combine.KeyedCombineFn<? super K, ? super VI, ?, VO> keyedCombineFn = 
transform.getFn();
 //
-//                     GroupReduceFunction<KV<K, VI>, KV<K, VO>> 
groupReduceFunction = new FlinkCombineFunction<>(keyedCombineFn);
+//      GroupReduceFunction<KV<K, VI>, KV<K, VO>> groupReduceFunction = new 
FlinkCombineFunction<>(keyedCombineFn);
 //
-//                     TypeInformation<KV<K, VO>> typeInformation = 
context.getTypeInfo(transform.getOutput());
+//      TypeInformation<KV<K, VO>> typeInformation = 
context.getTypeInfo(transform.getOutput());
 //
-//                     Grouping<KV<K, VI>> grouping = new 
UnsortedGrouping<>(inputDataSet, new Keys.ExpressionKeys<>(new String[]{""}, 
inputDataSet.getType()));
+//      Grouping<KV<K, VI>> grouping = new UnsortedGrouping<>(inputDataSet, 
new Keys.ExpressionKeys<>(new String[]{""}, inputDataSet.getType()));
 //
-//                     GroupReduceOperator<KV<K, VI>, KV<K, VO>> outputDataSet 
=
-//                                     new GroupReduceOperator<>(grouping, 
typeInformation, groupReduceFunction, transform.getName());
-//                     context.setOutputDataSet(transform.getOutput(), 
outputDataSet);
-//             }
-//     }
-       
-       private static class ParDoBoundTranslatorBatch<IN, OUT> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<ParDo.Bound<IN, OUT>> {
-               private static final Logger LOG = 
LoggerFactory.getLogger(ParDoBoundTranslatorBatch.class);
-
-               @Override
-               public void translateNode(ParDo.Bound<IN, OUT> transform, 
FlinkBatchTranslationContext context) {
-                       DataSet<IN> inputDataSet = 
context.getInputDataSet(context.getInput(transform));
-
-                       final DoFn<IN, OUT> doFn = transform.getFn();
-
-                       TypeInformation<OUT> typeInformation = 
context.getTypeInfo(context.getOutput(transform));
-
-                       FlinkDoFnFunction<IN, OUT> doFnWrapper = new 
FlinkDoFnFunction<>(doFn, context.getPipelineOptions());
-                       MapPartitionOperator<IN, OUT> outputDataSet = new 
MapPartitionOperator<>(inputDataSet, typeInformation, doFnWrapper, 
transform.getName());
-
-                       transformSideInputs(transform.getSideInputs(), 
outputDataSet, context);
-
-                       context.setOutputDataSet(context.getOutput(transform), 
outputDataSet);
-               }
-       }
-
-       private static class ParDoBoundMultiTranslatorBatch<IN, OUT> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<ParDo.BoundMulti<IN, 
OUT>> {
-               private static final Logger LOG = 
LoggerFactory.getLogger(ParDoBoundMultiTranslatorBatch.class);
-
-               @Override
-               public void translateNode(ParDo.BoundMulti<IN, OUT> transform, 
FlinkBatchTranslationContext context) {
-                       DataSet<IN> inputDataSet = 
context.getInputDataSet(context.getInput(transform));
-
-                       final DoFn<IN, OUT> doFn = transform.getFn();
-
-                       Map<TupleTag<?>, PCollection<?>> outputs = 
context.getOutput(transform).getAll();
-
-                       Map<TupleTag<?>, Integer> outputMap = Maps.newHashMap();
-                       // put the main output at index 0, 
FlinkMultiOutputDoFnFunction also expects this
-                       outputMap.put(transform.getMainOutputTag(), 0);
-                       int count = 1;
-                       for (TupleTag<?> tag: outputs.keySet()) {
-                               if (!outputMap.containsKey(tag)) {
-                                       outputMap.put(tag, count++);
-                               }
-                       }
-
-                       // collect all output Coders and create a UnionCoder 
for our tagged outputs
-                       List<Coder<?>> outputCoders = Lists.newArrayList();
-                       for (PCollection<?> coll: outputs.values()) {
-                               outputCoders.add(coll.getCoder());
-                       }
-
-                       UnionCoder unionCoder = UnionCoder.of(outputCoders);
-
-                       @SuppressWarnings("unchecked")
-                       TypeInformation<RawUnionValue> typeInformation = new 
CoderTypeInformation<>(unionCoder);
-
-                       @SuppressWarnings("unchecked")
-                       FlinkMultiOutputDoFnFunction<IN, OUT> doFnWrapper = new 
FlinkMultiOutputDoFnFunction(doFn, context.getPipelineOptions(), outputMap);
-                       MapPartitionOperator<IN, RawUnionValue> outputDataSet = 
new MapPartitionOperator<>(inputDataSet, typeInformation, doFnWrapper, 
transform.getName());
-
-                       transformSideInputs(transform.getSideInputs(), 
outputDataSet, context);
-
-                       for (Map.Entry<TupleTag<?>, PCollection<?>> output: 
outputs.entrySet()) {
-                               TypeInformation<Object> outputType = 
context.getTypeInfo(output.getValue());
-                               int outputTag = outputMap.get(output.getKey());
-                               FlinkMultiOutputPruningFunction<Object> 
pruningFunction = new FlinkMultiOutputPruningFunction<>(outputTag);
-                               FlatMapOperator<RawUnionValue, Object> 
pruningOperator = new
-                                               
FlatMapOperator<>(outputDataSet, outputType,
-                                               pruningFunction, 
output.getValue().getName());
-                               context.setOutputDataSet(output.getValue(), 
pruningOperator);
-
-                       }
-               }
-       }
-
-       private static class FlattenPCollectionTranslatorBatch<T> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<Flatten.FlattenPCollectionList<T>>
 {
-
-               @Override
-               public void translateNode(Flatten.FlattenPCollectionList<T> 
transform, FlinkBatchTranslationContext context) {
-                       List<PCollection<T>> allInputs = 
context.getInput(transform).getAll();
-                       DataSet<T> result = null;
-                       for(PCollection<T> collection : allInputs) {
-                               DataSet<T> current = 
context.getInputDataSet(collection);
-                               if (result == null) {
-                                       result = current;
-                               } else {
-                                       result = result.union(current);
-                               }
-                       }
-                       context.setOutputDataSet(context.getOutput(transform), 
result);
-               }
-       }
-
-       private static class CreatePCollectionViewTranslatorBatch<R, T> 
implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<View.CreatePCollectionView<R,
 T>> {
-               @Override
-               public void translateNode(View.CreatePCollectionView<R, T> 
transform, FlinkBatchTranslationContext context) {
-                       DataSet<T> inputDataSet = 
context.getInputDataSet(context.getInput(transform));
-                       PCollectionView<T> input = transform.apply(null);
-                       context.setSideInputDataSet(input, inputDataSet);
-               }
-       }
-
-       private static class CreateTranslatorBatch<OUT> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<Create.Values<OUT>> {
-
-               @Override
-               public void translateNode(Create.Values<OUT> transform, 
FlinkBatchTranslationContext context) {
-                       TypeInformation<OUT> typeInformation = 
context.getOutputTypeInfo();
-                       Iterable<OUT> elements = transform.getElements();
-
-                       // we need to serialize the elements to byte arrays, 
since they might contain
-                       // elements that are not serializable by Java 
serialization. We deserialize them
-                       // in the FlatMap function using the Coder.
-
-                       List<byte[]> serializedElements = Lists.newArrayList();
-                       Coder<OUT> coder = 
context.getOutput(transform).getCoder();
-                       for (OUT element: elements) {
-                               ByteArrayOutputStream bao = new 
ByteArrayOutputStream();
-                               try {
-                                       coder.encode(element, bao, 
Coder.Context.OUTER);
-                                       
serializedElements.add(bao.toByteArray());
-                               } catch (IOException e) {
-                                       throw new RuntimeException("Could not 
serialize Create elements using Coder: " + e);
-                               }
-                       }
-
-                       DataSet<Integer> initDataSet = 
context.getExecutionEnvironment().fromElements(1);
-                       FlinkCreateFunction<Integer, OUT> flatMapFunction = new 
FlinkCreateFunction<>(serializedElements, coder);
-                       FlatMapOperator<Integer, OUT> outputDataSet = new 
FlatMapOperator<>(initDataSet, typeInformation, flatMapFunction, 
transform.getName());
-
-                       context.setOutputDataSet(context.getOutput(transform), 
outputDataSet);
-               }
-       }
-
-       private static void transformSideInputs(List<PCollectionView<?>> 
sideInputs,
-                                               MapPartitionOperator<?, ?> 
outputDataSet,
-                                               FlinkBatchTranslationContext 
context) {
-               // get corresponding Flink broadcast DataSets
-               for(PCollectionView<?> input : sideInputs) {
-                       DataSet<?> broadcastSet = 
context.getSideInputDataSet(input);
-                       outputDataSet.withBroadcastSet(broadcastSet, 
input.getTagInternal().getId());
-               }
-       }
+//      GroupReduceOperator<KV<K, VI>, KV<K, VO>> outputDataSet =
+//          new GroupReduceOperator<>(grouping, typeInformation, 
groupReduceFunction, transform.getName());
+//      context.setOutputDataSet(transform.getOutput(), outputDataSet);
+//    }
+//  }
+  
+  private static class ParDoBoundTranslatorBatch<IN, OUT> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<ParDo.Bound<IN, OUT>> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ParDoBoundTranslatorBatch.class);
+
+    @Override
+    public void translateNode(ParDo.Bound<IN, OUT> transform, 
FlinkBatchTranslationContext context) {
+      DataSet<IN> inputDataSet = 
context.getInputDataSet(context.getInput(transform));
+
+      final DoFn<IN, OUT> doFn = transform.getFn();
+
+      TypeInformation<OUT> typeInformation = 
context.getTypeInfo(context.getOutput(transform));
+
+      FlinkDoFnFunction<IN, OUT> doFnWrapper = new FlinkDoFnFunction<>(doFn, 
context.getPipelineOptions());
+      MapPartitionOperator<IN, OUT> outputDataSet = new 
MapPartitionOperator<>(inputDataSet, typeInformation, doFnWrapper, 
transform.getName());
+
+      transformSideInputs(transform.getSideInputs(), outputDataSet, context);
+
+      context.setOutputDataSet(context.getOutput(transform), outputDataSet);
+    }
+  }
+
+  private static class ParDoBoundMultiTranslatorBatch<IN, OUT> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<ParDo.BoundMulti<IN, 
OUT>> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ParDoBoundMultiTranslatorBatch.class);
+
+    @Override
+    public void translateNode(ParDo.BoundMulti<IN, OUT> transform, 
FlinkBatchTranslationContext context) {
+      DataSet<IN> inputDataSet = 
context.getInputDataSet(context.getInput(transform));
+
+      final DoFn<IN, OUT> doFn = transform.getFn();
+
+      Map<TupleTag<?>, PCollection<?>> outputs = 
context.getOutput(transform).getAll();
+
+      Map<TupleTag<?>, Integer> outputMap = Maps.newHashMap();
+      // put the main output at index 0, FlinkMultiOutputDoFnFunction also 
expects this
+      outputMap.put(transform.getMainOutputTag(), 0);
+      int count = 1;
+      for (TupleTag<?> tag: outputs.keySet()) {
+        if (!outputMap.containsKey(tag)) {
+          outputMap.put(tag, count++);
+        }
+      }
+
+      // collect all output Coders and create a UnionCoder for our tagged 
outputs
+      List<Coder<?>> outputCoders = Lists.newArrayList();
+      for (PCollection<?> coll: outputs.values()) {
+        outputCoders.add(coll.getCoder());
+      }
+
+      UnionCoder unionCoder = UnionCoder.of(outputCoders);
+
+      @SuppressWarnings("unchecked")
+      TypeInformation<RawUnionValue> typeInformation = new 
CoderTypeInformation<>(unionCoder);
+
+      @SuppressWarnings("unchecked")
+      FlinkMultiOutputDoFnFunction<IN, OUT> doFnWrapper = new 
FlinkMultiOutputDoFnFunction(doFn, context.getPipelineOptions(), outputMap);
+      MapPartitionOperator<IN, RawUnionValue> outputDataSet = new 
MapPartitionOperator<>(inputDataSet, typeInformation, doFnWrapper, 
transform.getName());
+
+      transformSideInputs(transform.getSideInputs(), outputDataSet, context);
+
+      for (Map.Entry<TupleTag<?>, PCollection<?>> output: outputs.entrySet()) {
+        TypeInformation<Object> outputType = 
context.getTypeInfo(output.getValue());
+        int outputTag = outputMap.get(output.getKey());
+        FlinkMultiOutputPruningFunction<Object> pruningFunction = new 
FlinkMultiOutputPruningFunction<>(outputTag);
+        FlatMapOperator<RawUnionValue, Object> pruningOperator = new
+            FlatMapOperator<>(outputDataSet, outputType,
+            pruningFunction, output.getValue().getName());
+        context.setOutputDataSet(output.getValue(), pruningOperator);
+
+      }
+    }
+  }
+
+  private static class FlattenPCollectionTranslatorBatch<T> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<Flatten.FlattenPCollectionList<T>>
 {
+
+    @Override
+    public void translateNode(Flatten.FlattenPCollectionList<T> transform, 
FlinkBatchTranslationContext context) {
+      List<PCollection<T>> allInputs = context.getInput(transform).getAll();
+      DataSet<T> result = null;
+      for(PCollection<T> collection : allInputs) {
+        DataSet<T> current = context.getInputDataSet(collection);
+        if (result == null) {
+          result = current;
+        } else {
+          result = result.union(current);
+        }
+      }
+      context.setOutputDataSet(context.getOutput(transform), result);
+    }
+  }
+
+  private static class CreatePCollectionViewTranslatorBatch<R, T> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<View.CreatePCollectionView<R,
 T>> {
+    @Override
+    public void translateNode(View.CreatePCollectionView<R, T> transform, 
FlinkBatchTranslationContext context) {
+      DataSet<T> inputDataSet = 
context.getInputDataSet(context.getInput(transform));
+      PCollectionView<T> input = transform.apply(null);
+      context.setSideInputDataSet(input, inputDataSet);
+    }
+  }
+
+  private static class CreateTranslatorBatch<OUT> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<Create.Values<OUT>> {
+
+    @Override
+    public void translateNode(Create.Values<OUT> transform, 
FlinkBatchTranslationContext context) {
+      TypeInformation<OUT> typeInformation = context.getOutputTypeInfo();
+      Iterable<OUT> elements = transform.getElements();
+
+      // we need to serialize the elements to byte arrays, since they might 
contain
+      // elements that are not serializable by Java serialization. We 
deserialize them
+      // in the FlatMap function using the Coder.
+
+      List<byte[]> serializedElements = Lists.newArrayList();
+      Coder<OUT> coder = context.getOutput(transform).getCoder();
+      for (OUT element: elements) {
+        ByteArrayOutputStream bao = new ByteArrayOutputStream();
+        try {
+          coder.encode(element, bao, Coder.Context.OUTER);
+          serializedElements.add(bao.toByteArray());
+        } catch (IOException e) {
+          throw new RuntimeException("Could not serialize Create elements 
using Coder: " + e);
+        }
+      }
+
+      DataSet<Integer> initDataSet = 
context.getExecutionEnvironment().fromElements(1);
+      FlinkCreateFunction<Integer, OUT> flatMapFunction = new 
FlinkCreateFunction<>(serializedElements, coder);
+      FlatMapOperator<Integer, OUT> outputDataSet = new 
FlatMapOperator<>(initDataSet, typeInformation, flatMapFunction, 
transform.getName());
+
+      context.setOutputDataSet(context.getOutput(transform), outputDataSet);
+    }
+  }
+
+  private static void transformSideInputs(List<PCollectionView<?>> sideInputs,
+                                          MapPartitionOperator<?, ?> 
outputDataSet,
+                                          FlinkBatchTranslationContext 
context) {
+    // get corresponding Flink broadcast DataSets
+    for(PCollectionView<?> input : sideInputs) {
+      DataSet<?> broadcastSet = context.getSideInputDataSet(input);
+      outputDataSet.withBroadcastSet(broadcastSet, 
input.getTagInternal().getId());
+    }
+  }
 
 // Disabled because it depends on a pending pull request to the DataFlowSDK
-       /**
-        * Special composite transform translator. Only called if the CoGroup 
is two dimensional.
-        * @param <K>
-        */
-       private static class CoGroupByKeyTranslatorBatch<K, V1, V2> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<CoGroupByKey<K>> {
+  /**
+   * Special composite transform translator. Only called if the CoGroup is two 
dimensional.
+   * @param <K>
+   */
+  private static class CoGroupByKeyTranslatorBatch<K, V1, V2> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<CoGroupByKey<K>> {
 
-               @Override
-               public void translateNode(CoGroupByKey<K> transform, 
FlinkBatchTranslationContext context) {
-                       KeyedPCollectionTuple<K> input = 
context.getInput(transform);
+    @Override
+    public void translateNode(CoGroupByKey<K> transform, 
FlinkBatchTranslationContext context) {
+      KeyedPCollectionTuple<K> input = context.getInput(transform);
 
-                       CoGbkResultSchema schema = input.getCoGbkResultSchema();
-                       List<KeyedPCollectionTuple.TaggedKeyedPCollection<K, 
?>> keyedCollections = input.getKeyedCollections();
+      CoGbkResultSchema schema = input.getCoGbkResultSchema();
+      List<KeyedPCollectionTuple.TaggedKeyedPCollection<K, ?>> 
keyedCollections = input.getKeyedCollections();
 
-                       KeyedPCollectionTuple.TaggedKeyedPCollection<K, ?> 
taggedCollection1 = keyedCollections.get(0);
-                       KeyedPCollectionTuple.TaggedKeyedPCollection<K, ?> 
taggedCollection2 = keyedCollections.get(1);
+      KeyedPCollectionTuple.TaggedKeyedPCollection<K, ?> taggedCollection1 = 
keyedCollections.get(0);
+      KeyedPCollectionTuple.TaggedKeyedPCollection<K, ?> taggedCollection2 = 
keyedCollections.get(1);
 
-                       TupleTag<?> tupleTag1 = taggedCollection1.getTupleTag();
-                       TupleTag<?> tupleTag2 = taggedCollection2.getTupleTag();
+      TupleTag<?> tupleTag1 = taggedCollection1.getTupleTag();
+      TupleTag<?> tupleTag2 = taggedCollection2.getTupleTag();
 
-                       PCollection<? extends KV<K, ?>> collection1 = 
taggedCollection1.getCollection();
-                       PCollection<? extends KV<K, ?>> collection2 = 
taggedCollection2.getCollection();
+      PCollection<? extends KV<K, ?>> collection1 = 
taggedCollection1.getCollection();
+      PCollection<? extends KV<K, ?>> collection2 = 
taggedCollection2.getCollection();
 
-                       DataSet<KV<K,V1>> inputDataSet1 = 
context.getInputDataSet(collection1);
-                       DataSet<KV<K,V2>> inputDataSet2 = 
context.getInputDataSet(collection2);
+      DataSet<KV<K,V1>> inputDataSet1 = context.getInputDataSet(collection1);
+      DataSet<KV<K,V2>> inputDataSet2 = context.getInputDataSet(collection2);
 
-                       TypeInformation<KV<K,CoGbkResult>> typeInfo = 
context.getOutputTypeInfo();
+      TypeInformation<KV<K,CoGbkResult>> typeInfo = 
context.getOutputTypeInfo();
 
-                       FlinkCoGroupKeyedListAggregator<K,V1,V2> aggregator = 
new FlinkCoGroupKeyedListAggregator<>(schema, tupleTag1, tupleTag2);
+      FlinkCoGroupKeyedListAggregator<K,V1,V2> aggregator = new 
FlinkCoGroupKeyedListAggregator<>(schema, tupleTag1, tupleTag2);
 
-                       Keys.ExpressionKeys<KV<K,V1>> keySelector1 = new 
Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet1.getType());
-                       Keys.ExpressionKeys<KV<K,V2>> keySelector2 = new 
Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet2.getType());
+      Keys.ExpressionKeys<KV<K,V1>> keySelector1 = new 
Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet1.getType());
+      Keys.ExpressionKeys<KV<K,V2>> keySelector2 = new 
Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet2.getType());
 
-                       DataSet<KV<K, CoGbkResult>> out = new 
CoGroupOperator<>(inputDataSet1, inputDataSet2,
-                                                                               
                                                        keySelector1, 
keySelector2,
-                                                                               
        aggregator, typeInfo, null, transform.getName());
-                       context.setOutputDataSet(context.getOutput(transform), 
out);
-               }
-       }
+      DataSet<KV<K, CoGbkResult>> out = new CoGroupOperator<>(inputDataSet1, 
inputDataSet2,
+                                  keySelector1, keySelector2,
+                                                          aggregator, 
typeInfo, null, transform.getName());
+      context.setOutputDataSet(context.getOutput(transform), out);
+    }
+  }
 
-       // 
--------------------------------------------------------------------------------------------
-       //  Miscellaneous
-       // 
--------------------------------------------------------------------------------------------
-       
-       private FlinkBatchTransformTranslators() {}
+  // 
--------------------------------------------------------------------------------------------
+  //  Miscellaneous
+  // 
--------------------------------------------------------------------------------------------
+  
+  private FlinkBatchTransformTranslators() {}
 }

Reply via email to