Repository: incubator-beam Updated Branches: refs/heads/master 1cd64bb1a -> 2ffecfda2
[BEAM-270] remove CoGroupByKey translation artifacts We used to have an optimization for the CoGroupByKey operation with two inputs. This is no longer the cases after changes to the batch execution in BEAM-270. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9706438e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9706438e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9706438e Branch: refs/heads/master Commit: 9706438e3a1988c05ab16f9c14912af958ef875a Parents: 36a27f5 Author: Maximilian Michels <[email protected]> Authored: Mon May 30 14:42:57 2016 +0200 Committer: Maximilian Michels <[email protected]> Committed: Mon May 30 14:47:36 2016 +0200 ---------------------------------------------------------------------- .../translation/FlinkBatchPipelineTranslator.java | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9706438e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java index a19f29d..8f9a37a 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java @@ -22,8 +22,6 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.TransformTreeNode; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.join.CoGroupByKey; -import org.apache.beam.sdk.values.PValue; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -134,19 +132,7 @@ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator { return null; } - BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform); - - // No translator known - if (translator == null) { - return null; - } - - // We actually only specialize CoGroupByKey when exactly 2 inputs - if (transform instanceof CoGroupByKey && node.getInput().expand().size() != 2) { - return null; - } - - return translator; + return FlinkBatchTransformTranslators.getTranslator(transform); } private static String formatNodeName(TransformTreeNode node) {
