http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java
deleted file mode 100644
index 71e3b54..0000000
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.io;
-
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PDone;
-
-/**
- * Transform for printing the contents of a {@link 
com.google.cloud.dataflow.sdk.values.PCollection}.
- * to standard output.
- *
- * This is Flink-specific and will only work when executed using the
- * {@link org.apache.beam.runners.flink.FlinkPipelineRunner}.
- */
-public class ConsoleIO {
-
-  /**
-   * A PTransform that writes a PCollection to a standard output.
-   */
-  public static class Write {
-
-    /**
-     * Returns a ConsoleIO.Write PTransform with a default step name.
-     */
-    public static Bound create() {
-      return new Bound();
-    }
-
-    /**
-     * Returns a ConsoleIO.Write PTransform with the given step name.
-     */
-    public static Bound named(String name) {
-      return new Bound().named(name);
-    }
-
-    /**
-     * A PTransform that writes a bounded PCollection to standard output.
-     */
-    public static class Bound extends PTransform<PCollection<?>, PDone> {
-      private static final long serialVersionUID = 0;
-
-      Bound() {
-        super("ConsoleIO.Write");
-      }
-
-      Bound(String name) {
-        super(name);
-      }
-
-      /**
-       * Returns a new ConsoleIO.Write PTransform that's like this one but 
with the given
-       * step
-       * name.  Does not modify this object.
-       */
-      public Bound named(String name) {
-        return new Bound(name);
-      }
-
-      @Override
-      public PDone apply(PCollection<?> input) {
-        return PDone.in(input.getPipeline());
-      }
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
deleted file mode 100644
index 28a10b7..0000000
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
-import com.google.cloud.dataflow.sdk.values.PValue;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * FlinkBatchPipelineTranslator knows how to translate Pipeline objects into 
Flink Jobs.
- * This is based on {@link 
com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator}
- */
-public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkBatchPipelineTranslator.class);
-
-  /**
-   * The necessary context in the case of a batch job.
-   */
-  private final FlinkBatchTranslationContext batchContext;
-
-  private int depth = 0;
-
-  /**
-   * Composite transform that we want to translate before proceeding with 
other transforms.
-   */
-  private PTransform<?, ?> currentCompositeTransform;
-
-  public FlinkBatchPipelineTranslator(ExecutionEnvironment env, 
PipelineOptions options) {
-    this.batchContext = new FlinkBatchTranslationContext(env, options);
-  }
-
-  // 
--------------------------------------------------------------------------------------------
-  //  Pipeline Visitor Methods
-  // 
--------------------------------------------------------------------------------------------
-
-  @Override
-  public void enterCompositeTransform(TransformTreeNode node) {
-    LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + 
formatNodeName(node));
-
-    PTransform<?, ?> transform = node.getTransform();
-    if (transform != null && currentCompositeTransform == null) {
-
-      BatchTransformTranslator<?> translator = 
FlinkBatchTransformTranslators.getTranslator(transform);
-      if (translator != null) {
-        currentCompositeTransform = transform;
-        if (transform instanceof CoGroupByKey && 
node.getInput().expand().size() != 2) {
-          // we can only optimize CoGroupByKey for input size 2
-          currentCompositeTransform = null;
-        }
-      }
-    }
-    this.depth++;
-  }
-
-  @Override
-  public void leaveCompositeTransform(TransformTreeNode node) {
-    PTransform<?, ?> transform = node.getTransform();
-    if (transform != null && currentCompositeTransform == transform) {
-
-      BatchTransformTranslator<?> translator = 
FlinkBatchTransformTranslators.getTranslator(transform);
-      if (translator != null) {
-        LOG.info(genSpaces(this.depth) + "doingCompositeTransform- " + 
formatNodeName(node));
-        applyBatchTransform(transform, node, translator);
-        currentCompositeTransform = null;
-      } else {
-        throw new IllegalStateException("Attempted to translate composite 
transform " +
-            "but no translator was found: " + currentCompositeTransform);
-      }
-    }
-    this.depth--;
-    LOG.info(genSpaces(this.depth) + "leaveCompositeTransform- " + 
formatNodeName(node));
-  }
-
-  @Override
-  public void visitTransform(TransformTreeNode node) {
-    LOG.info(genSpaces(this.depth) + "visitTransform- " + 
formatNodeName(node));
-    if (currentCompositeTransform != null) {
-      // ignore it
-      return;
-    }
-
-    // get the transformation corresponding to hte node we are
-    // currently visiting and translate it into its Flink alternative.
-
-    PTransform<?, ?> transform = node.getTransform();
-    BatchTransformTranslator<?> translator = 
FlinkBatchTransformTranslators.getTranslator(transform);
-    if (translator == null) {
-      LOG.info(node.getTransform().getClass().toString());
-      throw new UnsupportedOperationException("The transform " + transform + " 
is currently not supported.");
-    }
-    applyBatchTransform(transform, node, translator);
-  }
-
-  @Override
-  public void visitValue(PValue value, TransformTreeNode producer) {
-    // do nothing here
-  }
-
-  private <T extends PTransform<?, ?>> void applyBatchTransform(PTransform<?, 
?> transform, TransformTreeNode node, BatchTransformTranslator<?> translator) {
-
-    @SuppressWarnings("unchecked")
-    T typedTransform = (T) transform;
-
-    @SuppressWarnings("unchecked")
-    BatchTransformTranslator<T> typedTranslator = 
(BatchTransformTranslator<T>) translator;
-
-    // create the applied PTransform on the batchContext
-    batchContext.setCurrentTransform(AppliedPTransform.of(
-        node.getFullName(), node.getInput(), node.getOutput(), (PTransform) 
transform));
-    typedTranslator.translateNode(typedTransform, batchContext);
-  }
-
-  /**
-   * A translator of a {@link PTransform}.
-   */
-  public interface BatchTransformTranslator<Type extends PTransform> {
-    void translateNode(Type transform, FlinkBatchTranslationContext context);
-  }
-
-  private static String genSpaces(int n) {
-    String s = "";
-    for (int i = 0; i < n; i++) {
-      s += "|   ";
-    }
-    return s;
-  }
-
-  private static String formatNodeName(TransformTreeNode node) {
-    return node.toString().split("@")[1] + node.getTransform();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
deleted file mode 100644
index 48c783d..0000000
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
+++ /dev/null
@@ -1,594 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation;
-
-import org.apache.beam.runners.flink.io.ConsoleIO;
-import 
org.apache.beam.runners.flink.translation.functions.FlinkCoGroupKeyedListAggregator;
-import org.apache.beam.runners.flink.translation.functions.FlinkCreateFunction;
-import org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction;
-import 
org.apache.beam.runners.flink.translation.functions.FlinkKeyedListAggregationFunction;
-import 
org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputDoFnFunction;
-import 
org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputPruningFunction;
-import 
org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
-import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
-import org.apache.beam.runners.flink.translation.functions.UnionCoder;
-import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.runners.flink.translation.types.KvCoderTypeInformation;
-import org.apache.beam.runners.flink.translation.wrappers.SinkOutputFormat;
-import org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat;
-import com.google.api.client.util.Maps;
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.io.AvroIO;
-import com.google.cloud.dataflow.sdk.io.BoundedSource;
-import com.google.cloud.dataflow.sdk.io.Read;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.Flatten;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.View;
-import com.google.cloud.dataflow.sdk.transforms.Write;
-import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult;
-import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResultSchema;
-import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple;
-import com.google.cloud.dataflow.sdk.transforms.join.RawUnionValue;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.PValue;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.common.collect.Lists;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.io.AvroInputFormat;
-import org.apache.flink.api.java.io.AvroOutputFormat;
-import org.apache.flink.api.java.io.TextInputFormat;
-import org.apache.flink.api.java.operators.CoGroupOperator;
-import org.apache.flink.api.java.operators.DataSink;
-import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.api.java.operators.FlatMapOperator;
-import org.apache.flink.api.java.operators.GroupCombineOperator;
-import org.apache.flink.api.java.operators.GroupReduceOperator;
-import org.apache.flink.api.java.operators.Grouping;
-import org.apache.flink.api.java.operators.MapPartitionOperator;
-import org.apache.flink.api.java.operators.UnsortedGrouping;
-import org.apache.flink.core.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Translators for transforming
- * Dataflow {@link com.google.cloud.dataflow.sdk.transforms.PTransform}s to
- * Flink {@link org.apache.flink.api.java.DataSet}s
- */
-public class FlinkBatchTransformTranslators {
-
-  // 
--------------------------------------------------------------------------------------------
-  //  Transform Translator Registry
-  // 
--------------------------------------------------------------------------------------------
-  
-  @SuppressWarnings("rawtypes")
-  private static final Map<Class<? extends PTransform>, 
FlinkBatchPipelineTranslator.BatchTransformTranslator> TRANSLATORS = new 
HashMap<>();
-
-  // register the known translators
-  static {
-    TRANSLATORS.put(View.CreatePCollectionView.class, new 
CreatePCollectionViewTranslatorBatch());
-
-    TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslatorBatch());
-    // we don't need this because we translate the Combine.PerKey directly
-    //TRANSLATORS.put(Combine.GroupedValues.class, new 
CombineGroupedValuesTranslator());
-
-    TRANSLATORS.put(Create.Values.class, new CreateTranslatorBatch());
-
-    TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new 
FlattenPCollectionTranslatorBatch());
-
-    TRANSLATORS.put(GroupByKey.GroupByKeyOnly.class, new 
GroupByKeyOnlyTranslatorBatch());
-    // TODO we're currently ignoring windows here but that has to change in 
the future
-    TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch());
-
-    TRANSLATORS.put(ParDo.BoundMulti.class, new 
ParDoBoundMultiTranslatorBatch());
-    TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundTranslatorBatch());
-
-    TRANSLATORS.put(CoGroupByKey.class, new CoGroupByKeyTranslatorBatch());
-
-    TRANSLATORS.put(AvroIO.Read.Bound.class, new AvroIOReadTranslatorBatch());
-    TRANSLATORS.put(AvroIO.Write.Bound.class, new 
AvroIOWriteTranslatorBatch());
-
-    TRANSLATORS.put(Read.Bounded.class, new ReadSourceTranslatorBatch());
-    TRANSLATORS.put(Write.Bound.class, new WriteSinkTranslatorBatch());
-
-    TRANSLATORS.put(TextIO.Read.Bound.class, new TextIOReadTranslatorBatch());
-    TRANSLATORS.put(TextIO.Write.Bound.class, new 
TextIOWriteTranslatorBatch());
-
-    // Flink-specific
-    TRANSLATORS.put(ConsoleIO.Write.Bound.class, new 
ConsoleIOWriteTranslatorBatch());
-
-  }
-
-
-  public static FlinkBatchPipelineTranslator.BatchTransformTranslator<?> 
getTranslator(PTransform<?, ?> transform) {
-    return TRANSLATORS.get(transform.getClass());
-  }
-
-  private static class ReadSourceTranslatorBatch<T> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<Read.Bounded<T>> {
-
-    @Override
-    public void translateNode(Read.Bounded<T> transform, 
FlinkBatchTranslationContext context) {
-      String name = transform.getName();
-      BoundedSource<T> source = transform.getSource();
-      PCollection<T> output = context.getOutput(transform);
-      Coder<T> coder = output.getCoder();
-
-      TypeInformation<T> typeInformation = context.getTypeInfo(output);
-
-      DataSource<T> dataSource = new 
DataSource<>(context.getExecutionEnvironment(),
-          new SourceInputFormat<>(source, context.getPipelineOptions()), 
typeInformation, name);
-
-      context.setOutputDataSet(output, dataSource);
-    }
-  }
-
-  private static class AvroIOReadTranslatorBatch<T> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<AvroIO.Read.Bound<T>> {
-    private static final Logger LOG = 
LoggerFactory.getLogger(AvroIOReadTranslatorBatch.class);
-
-    @Override
-    public void translateNode(AvroIO.Read.Bound<T> transform, 
FlinkBatchTranslationContext context) {
-      String path = transform.getFilepattern();
-      String name = transform.getName();
-//      Schema schema = transform.getSchema();
-      PValue output = context.getOutput(transform);
-
-      TypeInformation<T> typeInformation = context.getTypeInfo(output);
-
-      // This is super hacky, but unfortunately we cannot get the type 
otherwise
-      Class<T> extractedAvroType;
-      try {
-        Field typeField = transform.getClass().getDeclaredField("type");
-        typeField.setAccessible(true);
-        @SuppressWarnings("unchecked")
-        Class<T> avroType = (Class<T>) typeField.get(transform);
-        extractedAvroType = avroType;
-      } catch (NoSuchFieldException | IllegalAccessException e) {
-        // we know that the field is there and it is accessible
-        throw new RuntimeException("Could not access type from AvroIO.Bound", 
e);
-      }
-
-      DataSource<T> source = new 
DataSource<>(context.getExecutionEnvironment(),
-          new AvroInputFormat<>(new Path(path), extractedAvroType),
-          typeInformation, name);
-
-      context.setOutputDataSet(output, source);
-    }
-  }
-
-  private static class AvroIOWriteTranslatorBatch<T> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<AvroIO.Write.Bound<T>> {
-    private static final Logger LOG = 
LoggerFactory.getLogger(AvroIOWriteTranslatorBatch.class);
-
-    @Override
-    public void translateNode(AvroIO.Write.Bound<T> transform, 
FlinkBatchTranslationContext context) {
-      DataSet<T> inputDataSet = 
context.getInputDataSet(context.getInput(transform));
-      String filenamePrefix = transform.getFilenamePrefix();
-      String filenameSuffix = transform.getFilenameSuffix();
-      int numShards = transform.getNumShards();
-      String shardNameTemplate = transform.getShardNameTemplate();
-
-      // TODO: Implement these. We need Flink support for this.
-      LOG.warn("Translation of TextIO.Write.filenameSuffix not yet supported. 
Is: {}.",
-          filenameSuffix);
-      LOG.warn("Translation of TextIO.Write.shardNameTemplate not yet 
supported. Is: {}.", shardNameTemplate);
-
-      // This is super hacky, but unfortunately we cannot get the type 
otherwise
-      Class<T> extractedAvroType;
-      try {
-        Field typeField = transform.getClass().getDeclaredField("type");
-        typeField.setAccessible(true);
-        @SuppressWarnings("unchecked")
-        Class<T> avroType = (Class<T>) typeField.get(transform);
-        extractedAvroType = avroType;
-      } catch (NoSuchFieldException | IllegalAccessException e) {
-        // we know that the field is there and it is accessible
-        throw new RuntimeException("Could not access type from AvroIO.Bound", 
e);
-      }
-
-      DataSink<T> dataSink = inputDataSet.output(new AvroOutputFormat<>(new 
Path
-          (filenamePrefix), extractedAvroType));
-
-      if (numShards > 0) {
-        dataSink.setParallelism(numShards);
-      }
-    }
-  }
-
-  private static class TextIOReadTranslatorBatch implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<TextIO.Read.Bound<String>>
 {
-    private static final Logger LOG = 
LoggerFactory.getLogger(TextIOReadTranslatorBatch.class);
-
-    @Override
-    public void translateNode(TextIO.Read.Bound<String> transform, 
FlinkBatchTranslationContext context) {
-      String path = transform.getFilepattern();
-      String name = transform.getName();
-
-      TextIO.CompressionType compressionType = transform.getCompressionType();
-      boolean needsValidation = transform.needsValidation();
-
-      // TODO: Implement these. We need Flink support for this.
-      LOG.warn("Translation of TextIO.CompressionType not yet supported. Is: 
{}.", compressionType);
-      LOG.warn("Translation of TextIO.Read.needsValidation not yet supported. 
Is: {}.", needsValidation);
-
-      PValue output = context.getOutput(transform);
-
-      TypeInformation<String> typeInformation = context.getTypeInfo(output);
-      DataSource<String> source = new 
DataSource<>(context.getExecutionEnvironment(), new TextInputFormat(new 
Path(path)), typeInformation, name);
-
-      context.setOutputDataSet(output, source);
-    }
-  }
-
-  private static class TextIOWriteTranslatorBatch<T> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<TextIO.Write.Bound<T>> {
-    private static final Logger LOG = 
LoggerFactory.getLogger(TextIOWriteTranslatorBatch.class);
-
-    @Override
-    public void translateNode(TextIO.Write.Bound<T> transform, 
FlinkBatchTranslationContext context) {
-      PValue input = context.getInput(transform);
-      DataSet<T> inputDataSet = context.getInputDataSet(input);
-
-      String filenamePrefix = transform.getFilenamePrefix();
-      String filenameSuffix = transform.getFilenameSuffix();
-      boolean needsValidation = transform.needsValidation();
-      int numShards = transform.getNumShards();
-      String shardNameTemplate = transform.getShardNameTemplate();
-
-      // TODO: Implement these. We need Flink support for this.
-      LOG.warn("Translation of TextIO.Write.needsValidation not yet supported. 
Is: {}.", needsValidation);
-      LOG.warn("Translation of TextIO.Write.filenameSuffix not yet supported. 
Is: {}.", filenameSuffix);
-      LOG.warn("Translation of TextIO.Write.shardNameTemplate not yet 
supported. Is: {}.", shardNameTemplate);
-
-      //inputDataSet.print();
-      DataSink<T> dataSink = inputDataSet.writeAsText(filenamePrefix);
-
-      if (numShards > 0) {
-        dataSink.setParallelism(numShards);
-      }
-    }
-  }
-
-  private static class ConsoleIOWriteTranslatorBatch implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<ConsoleIO.Write.Bound> {
-    @Override
-    public void translateNode(ConsoleIO.Write.Bound transform, 
FlinkBatchTranslationContext context) {
-      PValue input = context.getInput(transform);
-      DataSet<?> inputDataSet = context.getInputDataSet(input);
-      inputDataSet.printOnTaskManager(transform.getName());
-    }
-  }
-
-  private static class WriteSinkTranslatorBatch<T> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<Write.Bound<T>> {
-
-    @Override
-    public void translateNode(Write.Bound<T> transform, 
FlinkBatchTranslationContext context) {
-      String name = transform.getName();
-      PValue input = context.getInput(transform);
-      DataSet<T> inputDataSet = context.getInputDataSet(input);
-
-      inputDataSet.output(new SinkOutputFormat<>(transform, 
context.getPipelineOptions())).name(name);
-    }
-  }
-
-  private static class GroupByKeyOnlyTranslatorBatch<K, V> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey.GroupByKeyOnly<K,
 V>> {
-
-    @Override
-    public void translateNode(GroupByKey.GroupByKeyOnly<K, V> transform, 
FlinkBatchTranslationContext context) {
-      DataSet<KV<K, V>> inputDataSet = 
context.getInputDataSet(context.getInput(transform));
-      GroupReduceFunction<KV<K, V>, KV<K, Iterable<V>>> groupReduceFunction = 
new FlinkKeyedListAggregationFunction<>();
-
-      TypeInformation<KV<K, Iterable<V>>> typeInformation = 
context.getTypeInfo(context.getOutput(transform));
-
-      Grouping<KV<K, V>> grouping = new UnsortedGrouping<>(inputDataSet, new 
Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet.getType()));
-
-      GroupReduceOperator<KV<K, V>, KV<K, Iterable<V>>> outputDataSet =
-          new GroupReduceOperator<>(grouping, typeInformation, 
groupReduceFunction, transform.getName());
-      context.setOutputDataSet(context.getOutput(transform), outputDataSet);
-    }
-  }
-
-  /**
-   * Translates a GroupByKey while ignoring window assignments. This is 
identical to the {@link GroupByKeyOnlyTranslatorBatch}
-   */
-  private static class GroupByKeyTranslatorBatch<K, V> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey<K, V>> {
-
-    @Override
-    public void translateNode(GroupByKey<K, V> transform, 
FlinkBatchTranslationContext context) {
-      DataSet<KV<K, V>> inputDataSet = 
context.getInputDataSet(context.getInput(transform));
-      GroupReduceFunction<KV<K, V>, KV<K, Iterable<V>>> groupReduceFunction = 
new FlinkKeyedListAggregationFunction<>();
-
-      TypeInformation<KV<K, Iterable<V>>> typeInformation = 
context.getTypeInfo(context.getOutput(transform));
-
-      Grouping<KV<K, V>> grouping = new UnsortedGrouping<>(inputDataSet, new 
Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet.getType()));
-
-      GroupReduceOperator<KV<K, V>, KV<K, Iterable<V>>> outputDataSet =
-          new GroupReduceOperator<>(grouping, typeInformation, 
groupReduceFunction, transform.getName());
-
-      context.setOutputDataSet(context.getOutput(transform), outputDataSet);
-    }
-  }
-
-  private static class CombinePerKeyTranslatorBatch<K, VI, VA, VO> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<Combine.PerKey<K, VI, 
VO>> {
-
-    @Override
-    public void translateNode(Combine.PerKey<K, VI, VO> transform, 
FlinkBatchTranslationContext context) {
-      DataSet<KV<K, VI>> inputDataSet = 
context.getInputDataSet(context.getInput(transform));
-
-      @SuppressWarnings("unchecked")
-      Combine.KeyedCombineFn<K, VI, VA, VO> keyedCombineFn = 
(Combine.KeyedCombineFn<K, VI, VA, VO>) transform.getFn();
-
-      KvCoder<K, VI> inputCoder = (KvCoder<K, VI>) 
context.getInput(transform).getCoder();
-
-      Coder<VA> accumulatorCoder =
-          null;
-      try {
-        accumulatorCoder = 
keyedCombineFn.getAccumulatorCoder(context.getInput(transform).getPipeline().getCoderRegistry(),
 inputCoder.getKeyCoder(), inputCoder.getValueCoder());
-      } catch (CannotProvideCoderException e) {
-        e.printStackTrace();
-        // TODO
-      }
-
-      TypeInformation<KV<K, VI>> kvCoderTypeInformation = new 
KvCoderTypeInformation<>(inputCoder);
-      TypeInformation<KV<K, VA>> partialReduceTypeInfo = new 
KvCoderTypeInformation<>(KvCoder.of(inputCoder.getKeyCoder(), 
accumulatorCoder));
-
-      Grouping<KV<K, VI>> inputGrouping = new UnsortedGrouping<>(inputDataSet, 
new Keys.ExpressionKeys<>(new String[]{"key"}, kvCoderTypeInformation));
-
-      FlinkPartialReduceFunction<K, VI, VA> partialReduceFunction = new 
FlinkPartialReduceFunction<>(keyedCombineFn);
-
-      // Partially GroupReduce the values into the intermediate format VA 
(combine)
-      GroupCombineOperator<KV<K, VI>, KV<K, VA>> groupCombine =
-          new GroupCombineOperator<>(inputGrouping, partialReduceTypeInfo, 
partialReduceFunction,
-              "GroupCombine: " + transform.getName());
-
-      // Reduce fully to VO
-      GroupReduceFunction<KV<K, VA>, KV<K, VO>> reduceFunction = new 
FlinkReduceFunction<>(keyedCombineFn);
-
-      TypeInformation<KV<K, VO>> reduceTypeInfo = 
context.getTypeInfo(context.getOutput(transform));
-
-      Grouping<KV<K, VA>> intermediateGrouping = new 
UnsortedGrouping<>(groupCombine, new Keys.ExpressionKeys<>(new String[]{"key"}, 
groupCombine.getType()));
-
-      // Fully reduce the values and create output format VO
-      GroupReduceOperator<KV<K, VA>, KV<K, VO>> outputDataSet =
-          new GroupReduceOperator<>(intermediateGrouping, reduceTypeInfo, 
reduceFunction, transform.getName());
-
-      context.setOutputDataSet(context.getOutput(transform), outputDataSet);
-    }
-  }
-
-//  private static class CombineGroupedValuesTranslator<K, VI, VO> implements 
FlinkPipelineTranslator.TransformTranslator<Combine.GroupedValues<K, VI, VO>> {
-//
-//    @Override
-//    public void translateNode(Combine.GroupedValues<K, VI, VO> transform, 
TranslationContext context) {
-//      DataSet<KV<K, VI>> inputDataSet = 
context.getInputDataSet(transform.getInput());
-//
-//      Combine.KeyedCombineFn<? super K, ? super VI, ?, VO> keyedCombineFn = 
transform.getFn();
-//
-//      GroupReduceFunction<KV<K, VI>, KV<K, VO>> groupReduceFunction = new 
FlinkCombineFunction<>(keyedCombineFn);
-//
-//      TypeInformation<KV<K, VO>> typeInformation = 
context.getTypeInfo(transform.getOutput());
-//
-//      Grouping<KV<K, VI>> grouping = new UnsortedGrouping<>(inputDataSet, 
new Keys.ExpressionKeys<>(new String[]{""}, inputDataSet.getType()));
-//
-//      GroupReduceOperator<KV<K, VI>, KV<K, VO>> outputDataSet =
-//          new GroupReduceOperator<>(grouping, typeInformation, 
groupReduceFunction, transform.getName());
-//      context.setOutputDataSet(transform.getOutput(), outputDataSet);
-//    }
-//  }
-  
-  private static class ParDoBoundTranslatorBatch<IN, OUT> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<ParDo.Bound<IN, OUT>> {
-    private static final Logger LOG = 
LoggerFactory.getLogger(ParDoBoundTranslatorBatch.class);
-
-    @Override
-    public void translateNode(ParDo.Bound<IN, OUT> transform, 
FlinkBatchTranslationContext context) {
-      DataSet<IN> inputDataSet = 
context.getInputDataSet(context.getInput(transform));
-
-      final DoFn<IN, OUT> doFn = transform.getFn();
-
-      TypeInformation<OUT> typeInformation = 
context.getTypeInfo(context.getOutput(transform));
-
-      FlinkDoFnFunction<IN, OUT> doFnWrapper = new FlinkDoFnFunction<>(doFn, 
context.getPipelineOptions());
-      MapPartitionOperator<IN, OUT> outputDataSet = new 
MapPartitionOperator<>(inputDataSet, typeInformation, doFnWrapper, 
transform.getName());
-
-      transformSideInputs(transform.getSideInputs(), outputDataSet, context);
-
-      context.setOutputDataSet(context.getOutput(transform), outputDataSet);
-    }
-  }
-
-  private static class ParDoBoundMultiTranslatorBatch<IN, OUT> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<ParDo.BoundMulti<IN, 
OUT>> {
-    private static final Logger LOG = 
LoggerFactory.getLogger(ParDoBoundMultiTranslatorBatch.class);
-
-    @Override
-    public void translateNode(ParDo.BoundMulti<IN, OUT> transform, 
FlinkBatchTranslationContext context) {
-      DataSet<IN> inputDataSet = 
context.getInputDataSet(context.getInput(transform));
-
-      final DoFn<IN, OUT> doFn = transform.getFn();
-
-      Map<TupleTag<?>, PCollection<?>> outputs = 
context.getOutput(transform).getAll();
-
-      Map<TupleTag<?>, Integer> outputMap = Maps.newHashMap();
-      // put the main output at index 0, FlinkMultiOutputDoFnFunction also 
expects this
-      outputMap.put(transform.getMainOutputTag(), 0);
-      int count = 1;
-      for (TupleTag<?> tag: outputs.keySet()) {
-        if (!outputMap.containsKey(tag)) {
-          outputMap.put(tag, count++);
-        }
-      }
-
-      // collect all output Coders and create a UnionCoder for our tagged 
outputs
-      List<Coder<?>> outputCoders = Lists.newArrayList();
-      for (PCollection<?> coll: outputs.values()) {
-        outputCoders.add(coll.getCoder());
-      }
-
-      UnionCoder unionCoder = UnionCoder.of(outputCoders);
-
-      @SuppressWarnings("unchecked")
-      TypeInformation<RawUnionValue> typeInformation = new 
CoderTypeInformation<>(unionCoder);
-
-      @SuppressWarnings("unchecked")
-      FlinkMultiOutputDoFnFunction<IN, OUT> doFnWrapper = new 
FlinkMultiOutputDoFnFunction(doFn, context.getPipelineOptions(), outputMap);
-      MapPartitionOperator<IN, RawUnionValue> outputDataSet = new 
MapPartitionOperator<>(inputDataSet, typeInformation, doFnWrapper, 
transform.getName());
-
-      transformSideInputs(transform.getSideInputs(), outputDataSet, context);
-
-      for (Map.Entry<TupleTag<?>, PCollection<?>> output: outputs.entrySet()) {
-        TypeInformation<Object> outputType = 
context.getTypeInfo(output.getValue());
-        int outputTag = outputMap.get(output.getKey());
-        FlinkMultiOutputPruningFunction<Object> pruningFunction = new 
FlinkMultiOutputPruningFunction<>(outputTag);
-        FlatMapOperator<RawUnionValue, Object> pruningOperator = new
-            FlatMapOperator<>(outputDataSet, outputType,
-            pruningFunction, output.getValue().getName());
-        context.setOutputDataSet(output.getValue(), pruningOperator);
-
-      }
-    }
-  }
-
-  private static class FlattenPCollectionTranslatorBatch<T> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<Flatten.FlattenPCollectionList<T>>
 {
-
-    @Override
-    public void translateNode(Flatten.FlattenPCollectionList<T> transform, 
FlinkBatchTranslationContext context) {
-      List<PCollection<T>> allInputs = context.getInput(transform).getAll();
-      DataSet<T> result = null;
-      for(PCollection<T> collection : allInputs) {
-        DataSet<T> current = context.getInputDataSet(collection);
-        if (result == null) {
-          result = current;
-        } else {
-          result = result.union(current);
-        }
-      }
-      context.setOutputDataSet(context.getOutput(transform), result);
-    }
-  }
-
-  private static class CreatePCollectionViewTranslatorBatch<R, T> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<View.CreatePCollectionView<R,
 T>> {
-    @Override
-    public void translateNode(View.CreatePCollectionView<R, T> transform, 
FlinkBatchTranslationContext context) {
-      DataSet<T> inputDataSet = 
context.getInputDataSet(context.getInput(transform));
-      PCollectionView<T> input = transform.apply(null);
-      context.setSideInputDataSet(input, inputDataSet);
-    }
-  }
-
-  private static class CreateTranslatorBatch<OUT> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<Create.Values<OUT>> {
-
-    @Override
-    public void translateNode(Create.Values<OUT> transform, 
FlinkBatchTranslationContext context) {
-      TypeInformation<OUT> typeInformation = context.getOutputTypeInfo();
-      Iterable<OUT> elements = transform.getElements();
-
-      // we need to serialize the elements to byte arrays, since they might 
contain
-      // elements that are not serializable by Java serialization. We 
deserialize them
-      // in the FlatMap function using the Coder.
-
-      List<byte[]> serializedElements = Lists.newArrayList();
-      Coder<OUT> coder = context.getOutput(transform).getCoder();
-      for (OUT element: elements) {
-        ByteArrayOutputStream bao = new ByteArrayOutputStream();
-        try {
-          coder.encode(element, bao, Coder.Context.OUTER);
-          serializedElements.add(bao.toByteArray());
-        } catch (IOException e) {
-          throw new RuntimeException("Could not serialize Create elements 
using Coder: " + e);
-        }
-      }
-
-      DataSet<Integer> initDataSet = 
context.getExecutionEnvironment().fromElements(1);
-      FlinkCreateFunction<Integer, OUT> flatMapFunction = new 
FlinkCreateFunction<>(serializedElements, coder);
-      FlatMapOperator<Integer, OUT> outputDataSet = new 
FlatMapOperator<>(initDataSet, typeInformation, flatMapFunction, 
transform.getName());
-
-      context.setOutputDataSet(context.getOutput(transform), outputDataSet);
-    }
-  }
-
-  private static void transformSideInputs(List<PCollectionView<?>> sideInputs,
-                                          MapPartitionOperator<?, ?> 
outputDataSet,
-                                          FlinkBatchTranslationContext 
context) {
-    // get corresponding Flink broadcast DataSets
-    for(PCollectionView<?> input : sideInputs) {
-      DataSet<?> broadcastSet = context.getSideInputDataSet(input);
-      outputDataSet.withBroadcastSet(broadcastSet, 
input.getTagInternal().getId());
-    }
-  }
-
-// Disabled because it depends on a pending pull request to the DataFlowSDK
-  /**
-   * Special composite transform translator. Only called if the CoGroup is two 
dimensional.
-   * @param <K>
-   */
-  private static class CoGroupByKeyTranslatorBatch<K, V1, V2> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<CoGroupByKey<K>> {
-
-    @Override
-    public void translateNode(CoGroupByKey<K> transform, 
FlinkBatchTranslationContext context) {
-      KeyedPCollectionTuple<K> input = context.getInput(transform);
-
-      CoGbkResultSchema schema = input.getCoGbkResultSchema();
-      List<KeyedPCollectionTuple.TaggedKeyedPCollection<K, ?>> 
keyedCollections = input.getKeyedCollections();
-
-      KeyedPCollectionTuple.TaggedKeyedPCollection<K, ?> taggedCollection1 = 
keyedCollections.get(0);
-      KeyedPCollectionTuple.TaggedKeyedPCollection<K, ?> taggedCollection2 = 
keyedCollections.get(1);
-
-      TupleTag<?> tupleTag1 = taggedCollection1.getTupleTag();
-      TupleTag<?> tupleTag2 = taggedCollection2.getTupleTag();
-
-      PCollection<? extends KV<K, ?>> collection1 = 
taggedCollection1.getCollection();
-      PCollection<? extends KV<K, ?>> collection2 = 
taggedCollection2.getCollection();
-
-      DataSet<KV<K,V1>> inputDataSet1 = context.getInputDataSet(collection1);
-      DataSet<KV<K,V2>> inputDataSet2 = context.getInputDataSet(collection2);
-
-      TypeInformation<KV<K,CoGbkResult>> typeInfo = 
context.getOutputTypeInfo();
-
-      FlinkCoGroupKeyedListAggregator<K,V1,V2> aggregator = new 
FlinkCoGroupKeyedListAggregator<>(schema, tupleTag1, tupleTag2);
-
-      Keys.ExpressionKeys<KV<K,V1>> keySelector1 = new 
Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet1.getType());
-      Keys.ExpressionKeys<KV<K,V2>> keySelector2 = new 
Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet2.getType());
-
-      DataSet<KV<K, CoGbkResult>> out = new CoGroupOperator<>(inputDataSet1, 
inputDataSet2,
-                                  keySelector1, keySelector2,
-                                                          aggregator, 
typeInfo, null, transform.getName());
-      context.setOutputDataSet(context.getOutput(transform), out);
-    }
-  }
-
-  // 
--------------------------------------------------------------------------------------------
-  //  Miscellaneous
-  // 
--------------------------------------------------------------------------------------------
-  
-  private FlinkBatchTransformTranslators() {}
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
deleted file mode 100644
index 2294318..0000000
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation;
-
-import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.runners.flink.translation.types.KvCoderTypeInformation;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-import com.google.cloud.dataflow.sdk.values.PValue;
-import com.google.cloud.dataflow.sdk.values.TypedPValue;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class FlinkBatchTranslationContext {
-  
-  private final Map<PValue, DataSet<?>> dataSets;
-  private final Map<PCollectionView<?>, DataSet<?>> broadcastDataSets;
-
-  private final ExecutionEnvironment env;
-  private final PipelineOptions options;
-
-  private AppliedPTransform<?, ?, ?> currentTransform;
-  
-  // ------------------------------------------------------------------------
-  
-  public FlinkBatchTranslationContext(ExecutionEnvironment env, 
PipelineOptions options) {
-    this.env = env;
-    this.options = options;
-    this.dataSets = new HashMap<>();
-    this.broadcastDataSets = new HashMap<>();
-  }
-  
-  // ------------------------------------------------------------------------
-  
-  public ExecutionEnvironment getExecutionEnvironment() {
-    return env;
-  }
-
-  public PipelineOptions getPipelineOptions() {
-    return options;
-  }
-  
-  @SuppressWarnings("unchecked")
-  public <T> DataSet<T> getInputDataSet(PValue value) {
-    return (DataSet<T>) dataSets.get(value);
-  }
-
-  public void setOutputDataSet(PValue value, DataSet<?> set) {
-    if (!dataSets.containsKey(value)) {
-      dataSets.put(value, set);
-    }
-  }
-
-  /**
-   * Sets the AppliedPTransform which carries input/output.
-   * @param currentTransform
-   */
-  public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) 
{
-    this.currentTransform = currentTransform;
-  }
-
-  @SuppressWarnings("unchecked")
-  public <T> DataSet<T> getSideInputDataSet(PCollectionView<?> value) {
-    return (DataSet<T>) broadcastDataSets.get(value);
-  }
-
-  public void setSideInputDataSet(PCollectionView<?> value, DataSet<?> set) {
-    if (!broadcastDataSets.containsKey(value)) {
-      broadcastDataSets.put(value, set);
-    }
-  }
-  
-  @SuppressWarnings("unchecked")
-  public <T> TypeInformation<T> getTypeInfo(PInput output) {
-    if (output instanceof TypedPValue) {
-      Coder<?> outputCoder = ((TypedPValue) output).getCoder();
-      if (outputCoder instanceof KvCoder) {
-        return new KvCoderTypeInformation((KvCoder) outputCoder);
-      } else {
-        return new CoderTypeInformation(outputCoder);
-      }
-    }
-    return new GenericTypeInfo<>((Class<T>)Object.class);
-  }
-
-  public <T> TypeInformation<T> getInputTypeInfo() {
-    return getTypeInfo(currentTransform.getInput());
-  }
-
-  public <T> TypeInformation<T> getOutputTypeInfo() {
-    return getTypeInfo((PValue) currentTransform.getOutput());
-  }
-
-  @SuppressWarnings("unchecked")
-  <I extends PInput> I getInput(PTransform<I, ?> transform) {
-    return (I) currentTransform.getInput();
-  }
-
-  @SuppressWarnings("unchecked")
-  <O extends POutput> O getOutput(PTransform<?, O> transform) {
-    return (O) currentTransform.getOutput();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java
deleted file mode 100644
index 9407bf5..0000000
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-
-/**
- * The role of this class is to translate the Beam operators to
- * their Flink counterparts. If we have a streaming job, this is instantiated 
as a
- * {@link FlinkStreamingPipelineTranslator}. In other case, i.e. for a batch 
job,
- * a {@link FlinkBatchPipelineTranslator} is created. Correspondingly, the
- * {@link com.google.cloud.dataflow.sdk.values.PCollection}-based 
user-provided job is translated into
- * a {@link org.apache.flink.streaming.api.datastream.DataStream} (for 
streaming) or a
- * {@link org.apache.flink.api.java.DataSet} (for batch) one.
- */
-public abstract class FlinkPipelineTranslator implements 
Pipeline.PipelineVisitor {
-
-  public void translate(Pipeline pipeline) {
-    pipeline.traverseTopologically(this);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
deleted file mode 100644
index ac96807..0000000
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.values.PValue;
-import org.apache.beam.runners.flink.FlinkPipelineRunner;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This is a {@link FlinkPipelineTranslator} for streaming jobs. Its role is 
to translate the user-provided
- * {@link com.google.cloud.dataflow.sdk.values.PCollection}-based job into a
- * {@link org.apache.flink.streaming.api.datastream.DataStream} one.
- *
- * This is based on {@link 
com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator}
- * */
-public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkStreamingPipelineTranslator.class);
-
-  /** The necessary context in the case of a straming job. */
-  private final FlinkStreamingTranslationContext streamingContext;
-
-  private int depth = 0;
-
-  /** Composite transform that we want to translate before proceeding with 
other transforms. */
-  private PTransform<?, ?> currentCompositeTransform;
-
-  public FlinkStreamingPipelineTranslator(StreamExecutionEnvironment env, 
PipelineOptions options) {
-    this.streamingContext = new FlinkStreamingTranslationContext(env, options);
-  }
-
-  // 
--------------------------------------------------------------------------------------------
-  //  Pipeline Visitor Methods
-  // 
--------------------------------------------------------------------------------------------
-
-  @Override
-  public void enterCompositeTransform(TransformTreeNode node) {
-    LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + 
formatNodeName(node));
-
-    PTransform<?, ?> transform = node.getTransform();
-    if (transform != null && currentCompositeTransform == null) {
-
-      StreamTransformTranslator<?> translator = 
FlinkStreamingTransformTranslators.getTranslator(transform);
-      if (translator != null) {
-        currentCompositeTransform = transform;
-      }
-    }
-    this.depth++;
-  }
-
-  @Override
-  public void leaveCompositeTransform(TransformTreeNode node) {
-    PTransform<?, ?> transform = node.getTransform();
-    if (transform != null && currentCompositeTransform == transform) {
-
-      StreamTransformTranslator<?> translator = 
FlinkStreamingTransformTranslators.getTranslator(transform);
-      if (translator != null) {
-        LOG.info(genSpaces(this.depth) + "doingCompositeTransform- " + 
formatNodeName(node));
-        applyStreamingTransform(transform, node, translator);
-        currentCompositeTransform = null;
-      } else {
-        throw new IllegalStateException("Attempted to translate composite 
transform " +
-            "but no translator was found: " + currentCompositeTransform);
-      }
-    }
-    this.depth--;
-    LOG.info(genSpaces(this.depth) + "leaveCompositeTransform- " + 
formatNodeName(node));
-  }
-
-  @Override
-  public void visitTransform(TransformTreeNode node) {
-    LOG.info(genSpaces(this.depth) + "visitTransform- " + 
formatNodeName(node));
-    if (currentCompositeTransform != null) {
-      // ignore it
-      return;
-    }
-
-    // get the transformation corresponding to hte node we are
-    // currently visiting and translate it into its Flink alternative.
-
-    PTransform<?, ?> transform = node.getTransform();
-    StreamTransformTranslator<?> translator = 
FlinkStreamingTransformTranslators.getTranslator(transform);
-    if (translator == null) {
-      LOG.info(node.getTransform().getClass().toString());
-      throw new UnsupportedOperationException("The transform " + transform + " 
is currently not supported.");
-    }
-    applyStreamingTransform(transform, node, translator);
-  }
-
-  @Override
-  public void visitValue(PValue value, TransformTreeNode producer) {
-    // do nothing here
-  }
-
-  private <T extends PTransform<?, ?>> void 
applyStreamingTransform(PTransform<?, ?> transform, TransformTreeNode node, 
StreamTransformTranslator<?> translator) {
-
-    @SuppressWarnings("unchecked")
-    T typedTransform = (T) transform;
-
-    @SuppressWarnings("unchecked")
-    StreamTransformTranslator<T> typedTranslator = 
(StreamTransformTranslator<T>) translator;
-
-    // create the applied PTransform on the streamingContext
-    streamingContext.setCurrentTransform(AppliedPTransform.of(
-        node.getFullName(), node.getInput(), node.getOutput(), (PTransform) 
transform));
-    typedTranslator.translateNode(typedTransform, streamingContext);
-  }
-
-  /**
-   * The interface that every Flink translator of a Beam operator should 
implement.
-   * This interface is for <b>streaming</b> jobs. For examples of such 
translators see
-   * {@link FlinkStreamingTransformTranslators}.
-   */
-  public interface StreamTransformTranslator<Type extends PTransform> {
-    void translateNode(Type transform, FlinkStreamingTranslationContext 
context);
-  }
-
-  private static String genSpaces(int n) {
-    String s = "";
-    for (int i = 0; i < n; i++) {
-      s += "|   ";
-    }
-    return s;
-  }
-
-  private static String formatNodeName(TransformTreeNode node) {
-    return node.toString().split("@")[1] + node.getTransform();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
deleted file mode 100644
index bdefeaf..0000000
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ /dev/null
@@ -1,406 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.flink.translation;
-
-import org.apache.beam.runners.flink.translation.functions.UnionCoder;
-import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.*;
-import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.FlinkStreamingCreateFunction;
-import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource;
-import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
-import com.google.api.client.util.Maps;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.io.Read;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.transforms.*;
-import com.google.cloud.dataflow.sdk.transforms.join.RawUnionValue;
-import com.google.cloud.dataflow.sdk.transforms.windowing.*;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PValue;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.common.collect.Lists;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.streaming.api.datastream.*;
-import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.*;
-
-/**
- * This class contains all the mappings between Beam and Flink
- * <b>streaming</b> transformations. The {@link 
FlinkStreamingPipelineTranslator}
- * traverses the Beam job and comes here to translate the encountered Beam 
transformations
- * into Flink one, based on the mapping available in this class.
- */
-public class FlinkStreamingTransformTranslators {
-
-  // 
--------------------------------------------------------------------------------------------
-  //  Transform Translator Registry
-  // 
--------------------------------------------------------------------------------------------
-
-  @SuppressWarnings("rawtypes")
-  private static final Map<Class<? extends PTransform>, 
FlinkStreamingPipelineTranslator.StreamTransformTranslator> TRANSLATORS = new 
HashMap<>();
-
-  // here you can find all the available translators.
-  static {
-    TRANSLATORS.put(Create.Values.class, new CreateStreamingTranslator());
-    TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator());
-    TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundStreamingTranslator());
-    TRANSLATORS.put(TextIO.Write.Bound.class, new 
TextIOWriteBoundStreamingTranslator());
-    TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator());
-    TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator());
-    TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator());
-    TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new 
FlattenPCollectionTranslator());
-    TRANSLATORS.put(ParDo.BoundMulti.class, new 
ParDoBoundMultiStreamingTranslator());
-  }
-
-  public static FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> 
getTranslator(PTransform<?, ?> transform) {
-    return TRANSLATORS.get(transform.getClass());
-  }
-
-  // 
--------------------------------------------------------------------------------------------
-  //  Transformation Implementations
-  // 
--------------------------------------------------------------------------------------------
-
-  private static class CreateStreamingTranslator<OUT> implements
-      
FlinkStreamingPipelineTranslator.StreamTransformTranslator<Create.Values<OUT>> {
-
-    @Override
-    public void translateNode(Create.Values<OUT> transform, 
FlinkStreamingTranslationContext context) {
-      PCollection<OUT> output = context.getOutput(transform);
-      Iterable<OUT> elements = transform.getElements();
-
-      // we need to serialize the elements to byte arrays, since they might 
contain
-      // elements that are not serializable by Java serialization. We 
deserialize them
-      // in the FlatMap function using the Coder.
-
-      List<byte[]> serializedElements = Lists.newArrayList();
-      Coder<OUT> elementCoder = context.getOutput(transform).getCoder();
-      for (OUT element: elements) {
-        ByteArrayOutputStream bao = new ByteArrayOutputStream();
-        try {
-          elementCoder.encode(element, bao, Coder.Context.OUTER);
-          serializedElements.add(bao.toByteArray());
-        } catch (IOException e) {
-          throw new RuntimeException("Could not serialize Create elements 
using Coder: " + e);
-        }
-      }
-
-
-      DataStream<Integer> initDataSet = 
context.getExecutionEnvironment().fromElements(1);
-
-      FlinkStreamingCreateFunction<Integer, OUT> createFunction =
-          new FlinkStreamingCreateFunction<>(serializedElements, elementCoder);
-
-      WindowedValue.ValueOnlyWindowedValueCoder<OUT> windowCoder = 
WindowedValue.getValueOnlyCoder(elementCoder);
-      TypeInformation<WindowedValue<OUT>> outputType = new 
CoderTypeInformation<>(windowCoder);
-
-      DataStream<WindowedValue<OUT>> outputDataStream = 
initDataSet.flatMap(createFunction)
-          .returns(outputType);
-
-      context.setOutputDataStream(context.getOutput(transform), 
outputDataStream);
-    }
-  }
-
-
-  private static class TextIOWriteBoundStreamingTranslator<T> implements 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<TextIO.Write.Bound<T>>
 {
-    private static final Logger LOG = 
LoggerFactory.getLogger(TextIOWriteBoundStreamingTranslator.class);
-
-    @Override
-    public void translateNode(TextIO.Write.Bound<T> transform, 
FlinkStreamingTranslationContext context) {
-      PValue input = context.getInput(transform);
-      DataStream<WindowedValue<T>> inputDataStream = 
context.getInputDataStream(input);
-
-      String filenamePrefix = transform.getFilenamePrefix();
-      String filenameSuffix = transform.getFilenameSuffix();
-      boolean needsValidation = transform.needsValidation();
-      int numShards = transform.getNumShards();
-      String shardNameTemplate = transform.getShardNameTemplate();
-
-      // TODO: Implement these. We need Flink support for this.
-      LOG.warn("Translation of TextIO.Write.needsValidation not yet supported. 
Is: {}.", needsValidation);
-      LOG.warn("Translation of TextIO.Write.filenameSuffix not yet supported. 
Is: {}.", filenameSuffix);
-      LOG.warn("Translation of TextIO.Write.shardNameTemplate not yet 
supported. Is: {}.", shardNameTemplate);
-
-      DataStream<String> dataSink = inputDataStream.flatMap(new 
FlatMapFunction<WindowedValue<T>, String>() {
-        @Override
-        public void flatMap(WindowedValue<T> value, Collector<String> out) 
throws Exception {
-          out.collect(value.getValue().toString());
-        }
-      });
-      DataStreamSink<String> output = dataSink.writeAsText(filenamePrefix, 
FileSystem.WriteMode.OVERWRITE);
-
-      if (numShards > 0) {
-        output.setParallelism(numShards);
-      }
-    }
-  }
-
-  private static class UnboundedReadSourceTranslator<T> implements 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> {
-
-    @Override
-    public void translateNode(Read.Unbounded<T> transform, 
FlinkStreamingTranslationContext context) {
-      PCollection<T> output = context.getOutput(transform);
-
-      DataStream<WindowedValue<T>> source;
-      if (transform.getSource().getClass().equals(UnboundedFlinkSource.class)) 
{
-        UnboundedFlinkSource flinkSource = (UnboundedFlinkSource) 
transform.getSource();
-        source = context.getExecutionEnvironment()
-            .addSource(flinkSource.getFlinkSource())
-            .flatMap(new FlatMapFunction<String, WindowedValue<String>>() {
-              @Override
-              public void flatMap(String s, Collector<WindowedValue<String>> 
collector) throws Exception {
-                collector.collect(WindowedValue.<String>of(s, Instant.now(), 
GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
-              }
-            });
-      } else {
-        source = context.getExecutionEnvironment()
-            .addSource(new 
UnboundedSourceWrapper<>(context.getPipelineOptions(), transform));
-      }
-      context.setOutputDataStream(output, source);
-    }
-  }
-
-  private static class ParDoBoundStreamingTranslator<IN, OUT> implements 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<ParDo.Bound<IN, 
OUT>> {
-
-    @Override
-    public void translateNode(ParDo.Bound<IN, OUT> transform, 
FlinkStreamingTranslationContext context) {
-      PCollection<OUT> output = context.getOutput(transform);
-
-      final WindowingStrategy<OUT, ? extends BoundedWindow> windowingStrategy =
-          (WindowingStrategy<OUT, ? extends BoundedWindow>)
-              context.getOutput(transform).getWindowingStrategy();
-
-      WindowedValue.WindowedValueCoder<OUT> outputStreamCoder = 
WindowedValue.getFullCoder(output.getCoder(),
-          windowingStrategy.getWindowFn().windowCoder());
-      CoderTypeInformation<WindowedValue<OUT>> outputWindowedValueCoder =
-          new CoderTypeInformation<>(outputStreamCoder);
-
-      FlinkParDoBoundWrapper<IN, OUT> doFnWrapper = new 
FlinkParDoBoundWrapper<>(
-          context.getPipelineOptions(), windowingStrategy, transform.getFn());
-      DataStream<WindowedValue<IN>> inputDataStream = 
context.getInputDataStream(context.getInput(transform));
-      SingleOutputStreamOperator<WindowedValue<OUT>> outDataStream = 
inputDataStream.flatMap(doFnWrapper)
-          .returns(outputWindowedValueCoder);
-
-      context.setOutputDataStream(context.getOutput(transform), outDataStream);
-    }
-  }
-
-  public static class WindowBoundTranslator<T> implements 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<Window.Bound<T>> {
-
-    @Override
-    public void translateNode(Window.Bound<T> transform, 
FlinkStreamingTranslationContext context) {
-      PValue input = context.getInput(transform);
-      DataStream<WindowedValue<T>> inputDataStream = 
context.getInputDataStream(input);
-
-      final WindowingStrategy<T, ? extends BoundedWindow> windowingStrategy =
-          (WindowingStrategy<T, ? extends BoundedWindow>)
-              context.getOutput(transform).getWindowingStrategy();
-
-      final WindowFn<T, ? extends BoundedWindow> windowFn = 
windowingStrategy.getWindowFn();
-
-      WindowedValue.WindowedValueCoder<T> outputStreamCoder = 
WindowedValue.getFullCoder(
-          context.getInput(transform).getCoder(), 
windowingStrategy.getWindowFn().windowCoder());
-      CoderTypeInformation<WindowedValue<T>> outputWindowedValueCoder =
-          new CoderTypeInformation<>(outputStreamCoder);
-
-      final FlinkParDoBoundWrapper<T, T> windowDoFnAssigner = new 
FlinkParDoBoundWrapper<>(
-          context.getPipelineOptions(), windowingStrategy, 
createWindowAssigner(windowFn));
-
-      SingleOutputStreamOperator<WindowedValue<T>> windowedStream =
-          
inputDataStream.flatMap(windowDoFnAssigner).returns(outputWindowedValueCoder);
-      context.setOutputDataStream(context.getOutput(transform), 
windowedStream);
-    }
-
-    private static <T, W extends BoundedWindow> DoFn<T, T> 
createWindowAssigner(final WindowFn<T, W> windowFn) {
-      return new DoFn<T, T>() {
-
-        @Override
-        public void processElement(final ProcessContext c) throws Exception {
-          Collection<W> windows = windowFn.assignWindows(
-              windowFn.new AssignContext() {
-                @Override
-                public T element() {
-                  return c.element();
-                }
-
-                @Override
-                public Instant timestamp() {
-                  return c.timestamp();
-                }
-
-                @Override
-                public Collection<? extends BoundedWindow> windows() {
-                  return c.windowingInternals().windows();
-                }
-              });
-
-          c.windowingInternals().outputWindowedValue(
-              c.element(), c.timestamp(), windows, c.pane());
-        }
-      };
-    }
-  }
-
-  public static class GroupByKeyTranslator<K, V> implements 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<GroupByKey<K, V>> {
-
-    @Override
-    public void translateNode(GroupByKey<K, V> transform, 
FlinkStreamingTranslationContext context) {
-      PValue input = context.getInput(transform);
-
-      DataStream<WindowedValue<KV<K, V>>> inputDataStream = 
context.getInputDataStream(input);
-      KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) 
context.getInput(transform).getCoder();
-
-      KeyedStream<WindowedValue<KV<K, V>>, K> groupByKStream = 
FlinkGroupByKeyWrapper
-          .groupStreamByKey(inputDataStream, inputKvCoder);
-
-      DataStream<WindowedValue<KV<K, Iterable<V>>>> groupedByKNWstream =
-          
FlinkGroupAlsoByWindowWrapper.createForIterable(context.getPipelineOptions(),
-              context.getInput(transform), groupByKStream);
-
-      context.setOutputDataStream(context.getOutput(transform), 
groupedByKNWstream);
-    }
-  }
-
-  public static class CombinePerKeyTranslator<K, VIN, VACC, VOUT> implements 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<Combine.PerKey<K, 
VIN, VOUT>> {
-
-    @Override
-    public void translateNode(Combine.PerKey<K, VIN, VOUT> transform, 
FlinkStreamingTranslationContext context) {
-      PValue input = context.getInput(transform);
-
-      DataStream<WindowedValue<KV<K, VIN>>> inputDataStream = 
context.getInputDataStream(input);
-      KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) 
context.getInput(transform).getCoder();
-      KvCoder<K, VOUT> outputKvCoder = (KvCoder<K, VOUT>) 
context.getOutput(transform).getCoder();
-
-      KeyedStream<WindowedValue<KV<K, VIN>>, K> groupByKStream = 
FlinkGroupByKeyWrapper
-          .groupStreamByKey(inputDataStream, inputKvCoder);
-
-      Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combineFn = 
(Combine.KeyedCombineFn<K, VIN, VACC, VOUT>) transform.getFn();
-      DataStream<WindowedValue<KV<K, VOUT>>> groupedByKNWstream =
-          FlinkGroupAlsoByWindowWrapper.create(context.getPipelineOptions(),
-              context.getInput(transform), groupByKStream, combineFn, 
outputKvCoder);
-
-      context.setOutputDataStream(context.getOutput(transform), 
groupedByKNWstream);
-    }
-  }
-
-  public static class FlattenPCollectionTranslator<T> implements 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<Flatten.FlattenPCollectionList<T>>
 {
-
-    @Override
-    public void translateNode(Flatten.FlattenPCollectionList<T> transform, 
FlinkStreamingTranslationContext context) {
-      List<PCollection<T>> allInputs = context.getInput(transform).getAll();
-      DataStream<T> result = null;
-      for (PCollection<T> collection : allInputs) {
-        DataStream<T> current = context.getInputDataStream(collection);
-        result = (result == null) ? current : result.union(current);
-      }
-      context.setOutputDataStream(context.getOutput(transform), result);
-    }
-  }
-
-  public static class ParDoBoundMultiStreamingTranslator<IN, OUT> implements 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<ParDo.BoundMulti<IN, 
OUT>> {
-
-    private final int MAIN_TAG_INDEX = 0;
-
-    @Override
-    public void translateNode(ParDo.BoundMulti<IN, OUT> transform, 
FlinkStreamingTranslationContext context) {
-
-      // we assume that the transformation does not change the windowing 
strategy.
-      WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy = 
context.getInput(transform).getWindowingStrategy();
-
-      Map<TupleTag<?>, PCollection<?>> outputs = 
context.getOutput(transform).getAll();
-      Map<TupleTag<?>, Integer> tagsToLabels = transformTupleTagsToLabels(
-          transform.getMainOutputTag(), outputs.keySet());
-
-      UnionCoder intermUnionCoder = getIntermUnionCoder(outputs.values());
-      WindowedValue.WindowedValueCoder<RawUnionValue> outputStreamCoder = 
WindowedValue.getFullCoder(
-          intermUnionCoder, windowingStrategy.getWindowFn().windowCoder());
-
-      CoderTypeInformation<WindowedValue<RawUnionValue>> 
intermWindowedValueCoder =
-          new CoderTypeInformation<>(outputStreamCoder);
-
-      FlinkParDoBoundMultiWrapper<IN, OUT> doFnWrapper = new 
FlinkParDoBoundMultiWrapper<>(
-          context.getPipelineOptions(), windowingStrategy, transform.getFn(),
-          transform.getMainOutputTag(), tagsToLabels);
-
-      DataStream<WindowedValue<IN>> inputDataStream = 
context.getInputDataStream(context.getInput(transform));
-      SingleOutputStreamOperator<WindowedValue<RawUnionValue>> 
intermDataStream =
-          
inputDataStream.flatMap(doFnWrapper).returns(intermWindowedValueCoder);
-
-      for (Map.Entry<TupleTag<?>, PCollection<?>> output : outputs.entrySet()) 
{
-        final int outputTag = tagsToLabels.get(output.getKey());
-
-        WindowedValue.WindowedValueCoder<?> coderForTag = 
WindowedValue.getFullCoder(
-            output.getValue().getCoder(),
-            windowingStrategy.getWindowFn().windowCoder());
-
-        CoderTypeInformation<WindowedValue<?>> windowedValueCoder =
-            new CoderTypeInformation(coderForTag);
-
-        context.setOutputDataStream(output.getValue(),
-            intermDataStream.filter(new 
FilterFunction<WindowedValue<RawUnionValue>>() {
-              @Override
-              public boolean filter(WindowedValue<RawUnionValue> value) throws 
Exception {
-                return value.getValue().getUnionTag() == outputTag;
-              }
-            }).flatMap(new FlatMapFunction<WindowedValue<RawUnionValue>, 
WindowedValue<?>>() {
-              @Override
-              public void flatMap(WindowedValue<RawUnionValue> value, 
Collector<WindowedValue<?>> collector) throws Exception {
-                collector.collect(WindowedValue.of(
-                    value.getValue().getValue(),
-                    value.getTimestamp(),
-                    value.getWindows(),
-                    value.getPane()));
-              }
-            }).returns(windowedValueCoder));
-      }
-    }
-
-    private Map<TupleTag<?>, Integer> transformTupleTagsToLabels(TupleTag<?> 
mainTag, Set<TupleTag<?>> secondaryTags) {
-      Map<TupleTag<?>, Integer> tagToLabelMap = Maps.newHashMap();
-      tagToLabelMap.put(mainTag, MAIN_TAG_INDEX);
-      int count = MAIN_TAG_INDEX + 1;
-      for (TupleTag<?> tag : secondaryTags) {
-        if (!tagToLabelMap.containsKey(tag)) {
-          tagToLabelMap.put(tag, count++);
-        }
-      }
-      return tagToLabelMap;
-    }
-
-    private UnionCoder getIntermUnionCoder(Collection<PCollection<?>> 
taggedCollections) {
-      List<Coder<?>> outputCoders = Lists.newArrayList();
-      for (PCollection<?> coll : taggedCollections) {
-        outputCoders.add(coll.getCoder());
-      }
-      return UnionCoder.of(outputCoders);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java
deleted file mode 100644
index f6bdecd..0000000
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-import com.google.cloud.dataflow.sdk.values.PValue;
-import com.google.common.base.Preconditions;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class FlinkStreamingTranslationContext {
-
-  private final StreamExecutionEnvironment env;
-  private final PipelineOptions options;
-
-  /**
-   * Keeps a mapping between the output value of the PTransform (in Dataflow) 
and the
-   * Flink Operator that produced it, after the translation of the 
correspondinf PTransform
-   * to its Flink equivalent.
-   * */
-  private final Map<PValue, DataStream<?>> dataStreams;
-
-  private AppliedPTransform<?, ?, ?> currentTransform;
-
-  public FlinkStreamingTranslationContext(StreamExecutionEnvironment env, 
PipelineOptions options) {
-    this.env = Preconditions.checkNotNull(env);
-    this.options = Preconditions.checkNotNull(options);
-    this.dataStreams = new HashMap<>();
-  }
-
-  public StreamExecutionEnvironment getExecutionEnvironment() {
-    return env;
-  }
-
-  public PipelineOptions getPipelineOptions() {
-    return options;
-  }
-
-  @SuppressWarnings("unchecked")
-  public <T> DataStream<T> getInputDataStream(PValue value) {
-    return (DataStream<T>) dataStreams.get(value);
-  }
-
-  public void setOutputDataStream(PValue value, DataStream<?> set) {
-    if (!dataStreams.containsKey(value)) {
-      dataStreams.put(value, set);
-    }
-  }
-
-  /**
-   * Sets the AppliedPTransform which carries input/output.
-   * @param currentTransform
-   */
-  public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) 
{
-    this.currentTransform = currentTransform;
-  }
-
-  @SuppressWarnings("unchecked")
-  public <I extends PInput> I getInput(PTransform<I, ?> transform) {
-    return (I) currentTransform.getInput();
-  }
-
-  @SuppressWarnings("unchecked")
-  public <O extends POutput> O getOutput(PTransform<?, O> transform) {
-    return (O) currentTransform.getOutput();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCoGroupKeyedListAggregator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCoGroupKeyedListAggregator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCoGroupKeyedListAggregator.java
deleted file mode 100644
index d5562b8..0000000
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCoGroupKeyedListAggregator.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult;
-import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResultSchema;
-import com.google.cloud.dataflow.sdk.transforms.join.RawUnionValue;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.util.Collector;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-public class FlinkCoGroupKeyedListAggregator<K,V1,V2> implements 
CoGroupFunction<KV<K,V1>, KV<K,V2>, KV<K, CoGbkResult>>{
-
-  private CoGbkResultSchema schema;
-  private TupleTag<?> tupleTag1;
-  private TupleTag<?> tupleTag2;
-
-  public FlinkCoGroupKeyedListAggregator(CoGbkResultSchema schema, TupleTag<?> 
tupleTag1, TupleTag<?> tupleTag2) {
-    this.schema = schema;
-    this.tupleTag1 = tupleTag1;
-    this.tupleTag2 = tupleTag2;
-  }
-
-  @Override
-  public void coGroup(Iterable<KV<K,V1>> first, Iterable<KV<K,V2>> second, 
Collector<KV<K, CoGbkResult>> out) throws Exception {
-    K k = null;
-    List<RawUnionValue> result = new ArrayList<>();
-    int index1 = schema.getIndex(tupleTag1);
-    for (KV<K,?> entry : first) {
-      k = entry.getKey();
-      result.add(new RawUnionValue(index1, entry.getValue()));
-    }
-    int index2 = schema.getIndex(tupleTag2);
-    for (KV<K,?> entry : second) {
-      k = entry.getKey();
-      result.add(new RawUnionValue(index2, entry.getValue()));
-    }
-    out.collect(KV.of(k, new CoGbkResult(schema, result)));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCreateFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCreateFunction.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCreateFunction.java
deleted file mode 100644
index 56af397..0000000
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCreateFunction.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import org.apache.beam.runners.flink.translation.types.VoidCoderTypeSerializer;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.util.Collector;
-
-import java.io.ByteArrayInputStream;
-import java.util.List;
-
-/**
- * This is a hack for transforming a {@link 
com.google.cloud.dataflow.sdk.transforms.Create}
- * operation. Flink does not allow {@code null} in it's equivalent operation:
- * {@link 
org.apache.flink.api.java.ExecutionEnvironment#fromElements(Object[])}. 
Therefore
- * we use a DataSource with one dummy element and output the elements of the 
Create operation
- * inside this FlatMap.
- */
-public class FlinkCreateFunction<IN, OUT> implements FlatMapFunction<IN, OUT> {
-
-  private final List<byte[]> elements;
-  private final Coder<OUT> coder;
-
-  public FlinkCreateFunction(List<byte[]> elements, Coder<OUT> coder) {
-    this.elements = elements;
-    this.coder = coder;
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public void flatMap(IN value, Collector<OUT> out) throws Exception {
-
-    for (byte[] element : elements) {
-      ByteArrayInputStream bai = new ByteArrayInputStream(element);
-      OUT outValue = coder.decode(bai, Coder.Context.OUTER);
-      if (outValue == null) {
-        // TODO Flink doesn't allow null values in records
-        out.collect((OUT) VoidCoderTypeSerializer.VoidValue.INSTANCE);
-      } else {
-        out.collect(outValue);
-      }
-    }
-
-    out.close();
-  }
-}

Reply via email to