BEAM-2575 ApexRunner doesn't emit watermarks for additional outputs
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e014db6b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e014db6b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e014db6b Branch: refs/heads/DSL_SQL Commit: e014db6b7af00b49467389854c63ef693819ec1f Parents: eee0c9c Author: Thomas Weise <[email protected]> Authored: Sun Jul 9 11:57:43 2017 -0700 Committer: Tyler Akidau <[email protected]> Committed: Wed Jul 12 20:01:01 2017 -0700 ---------------------------------------------------------------------- .../operators/ApexParDoOperator.java | 21 +++++++++++++------- .../runners/apex/examples/WordCountTest.java | 8 ++++++-- 2 files changed, 20 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e014db6b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java index 809ca2a..c3cbab2 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java @@ -359,10 +359,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements } } if (sideInputs.isEmpty()) { - if (traceTuples) { - LOG.debug("\nemitting watermark {}\n", mark); - } - output.emit(mark); + outputWatermark(mark); return; } @@ -370,10 +367,20 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements Math.min(pushedBackWatermark.get(), currentInputWatermark); if (potentialOutputWatermark > currentOutputWatermark) { currentOutputWatermark = potentialOutputWatermark; - if (traceTuples) { - LOG.debug("\nemitting watermark {}\n", currentOutputWatermark); + outputWatermark(ApexStreamTuple.WatermarkTuple.of(currentOutputWatermark)); + } + } + + private void outputWatermark(ApexStreamTuple.WatermarkTuple<?> mark) { + if (traceTuples) { + LOG.debug("\nemitting {}\n", mark); + } + output.emit(mark); + if (!additionalOutputPortMapping.isEmpty()) { + for (DefaultOutputPort<ApexStreamTuple<?>> additionalOutput : + additionalOutputPortMapping.values()) { + additionalOutput.emit(mark); } - output.emit(ApexStreamTuple.WatermarkTuple.of(currentOutputWatermark)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/e014db6b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java index e76096e..ba75746 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java @@ -123,11 +123,15 @@ public class WordCountTest { options.setInputFile(new File(inputFile).getAbsolutePath()); String outputFilePrefix = "target/wordcountresult.txt"; options.setOutput(outputFilePrefix); - WordCountTest.main(TestPipeline.convertToArgs(options)); File outFile1 = new File(outputFilePrefix + "-00000-of-00002"); File outFile2 = new File(outputFilePrefix + "-00001-of-00002"); - Assert.assertTrue(outFile1.exists() && outFile2.exists()); + Assert.assertTrue(!outFile1.exists() || outFile1.delete()); + Assert.assertTrue(!outFile2.exists() || outFile2.delete()); + + WordCountTest.main(TestPipeline.convertToArgs(options)); + + Assert.assertTrue("result files exist", outFile1.exists() && outFile2.exists()); HashSet<String> results = new HashSet<>(); results.addAll(FileUtils.readLines(outFile1)); results.addAll(FileUtils.readLines(outFile2));
