Repository: beam Updated Branches: refs/heads/master 7402d7600 -> bf9d45429
Fix Flink RunnableOnService tests * Check that a Multi-Output map contains the Tag, not the TaggedValue * Return Inputs from getInputs Don't return outputs. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a361b65d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a361b65d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a361b65d Branch: refs/heads/master Commit: a361b65d6aa56d70769403d884abf48d1e1141a4 Parents: 7402d76 Author: Thomas Groh <[email protected]> Authored: Tue Jan 24 17:41:07 2017 -0800 Committer: Dan Halperin <[email protected]> Committed: Wed Jan 25 09:03:23 2017 -0800 ---------------------------------------------------------------------- .../runners/flink/translation/FlinkBatchTransformTranslators.java | 2 +- .../flink/translation/FlinkStreamingTranslationContext.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a361b65d/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java index 654b464..f7f1878 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java @@ -580,7 +580,7 @@ class FlinkBatchTransformTranslators { outputMap.put(transform.getMainOutputTag(), 0); int count = 1; for (TaggedPValue taggedValue : outputs) { - if (!outputMap.containsKey(taggedValue)) { + if (!outputMap.containsKey(taggedValue.getTag())) { outputMap.put(taggedValue.getTag(), count++); } } http://git-wip-us.apache.org/repos/asf/beam/blob/a361b65d/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java index 6db252e..7932f68 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java @@ -107,7 +107,7 @@ public class FlinkStreamingTranslationContext { } public <T extends PInput> List<TaggedPValue> getInputs(PTransform<T, ?> transform) { - return currentTransform.getOutputs(); + return currentTransform.getInputs(); } @SuppressWarnings("unchecked")
