Repository: apex-malhar Updated Branches: refs/heads/master 8f00cefa2 -> f006ac6f5
APEXMALHAR-2201 Suppressed console output in tests of Stream API. Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/f006ac6f Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/f006ac6f Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/f006ac6f Branch: refs/heads/master Commit: f006ac6f557340fe620839debec2f076e1e291af Parents: 8f00cef Author: Shunxin <[email protected]> Authored: Wed Aug 31 11:18:56 2016 -0700 Committer: Shunxin <[email protected]> Committed: Thu Sep 1 13:05:52 2016 -0700 ---------------------------------------------------------------------- .../malhar/stream/sample/MinimalWordCount.java | 2 +- .../malhar/stream/sample/WindowedWordCount.java | 53 ++++++++++++-------- .../stream/sample/complete/AutoComplete.java | 53 ++++++++++++-------- .../sample/complete/TopWikipediaSessions.java | 1 + .../stream/sample/complete/TrafficRoutes.java | 2 +- .../sample/cookbook/CombinePerKeyExamples.java | 30 +++++++---- .../stream/sample/cookbook/DeDupExample.java | 5 +- .../stream/sample/MinimalWordCountTest.java | 2 +- .../stream/sample/WindowedWordCountTest.java | 3 +- .../sample/complete/AutoCompleteTest.java | 3 +- .../complete/TopWikipediaSessionsTest.java | 1 + .../sample/complete/TrafficRoutesTest.java | 1 + .../cookbook/CombinePerKeyExamplesTest.java | 6 +-- .../sample/cookbook/DeDupExampleTest.java | 2 +- .../apex/malhar/stream/api/ApexStream.java | 6 +++ .../malhar/stream/api/impl/ApexStreamImpl.java | 9 ++++ .../stream/sample/ApplicationWithStreamAPI.java | 6 ++- .../sample/ApplicationWithStreamAPITest.java | 2 + .../apex/malhar/stream/sample/MyStreamTest.java | 2 +- 19 files changed, 124 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java index 21afc5b..03579ab 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java @@ -117,7 +117,7 @@ public class MinimalWordCount implements StreamingApplication } }, name("FormatResults")) // Print the result. - .print() + .print(name("console")) // Attach a collector to the stream to collect results. .endWith(collector, collector.input, name("Collector")) // populate the dag using the stream. http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java index c8a0e51..f020ddf 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java @@ -64,17 +64,11 @@ public class WindowedWordCount implements StreamingApplication */ public static class TextInput extends BaseOperator implements InputOperator { - private static boolean done = false; - public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>(); + private boolean done = false; private transient BufferedReader reader; - public static boolean isDone() - { - return done; - } - @Override public void setup(Context.OperatorContext context) { @@ -101,20 +95,21 @@ public class WindowedWordCount implements StreamingApplication @Override public void emitTuples() { - try { - String line = reader.readLine(); - if (line == null) { - done = true; - reader.close(); - Thread.sleep(1000); - } else { - this.output.emit(line); + if (!done) { + try { + String line = reader.readLine(); + if (line == null) { + done = true; + reader.close(); + } else { + this.output.emit(line); + } + Thread.sleep(50); + } catch (IOException ex) { + throw new RuntimeException(ex); + } catch (InterruptedException e) { + throw Throwables.propagate(e); } - Thread.sleep(50); - } catch (IOException ex) { - throw new RuntimeException(ex); - } catch (InterruptedException e) { - throw Throwables.propagate(e); } } } @@ -122,6 +117,19 @@ public class WindowedWordCount implements StreamingApplication public static class Collector extends BaseOperator { private static Map<KeyValPair<Long, String>, Long> result = new HashMap<>(); + private static boolean done = false; + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + done = false; + } + + public static boolean isDone() + { + return done; + } public static Map<KeyValPair<Long, String>, Long> getResult() { @@ -134,6 +142,9 @@ public class WindowedWordCount implements StreamingApplication public void process(PojoEvent tuple) { result.put(new KeyValPair<Long, String>(tuple.getTimestamp(), tuple.getWord()), tuple.getCount()); + if (tuple.getWord().equals("bye")) { + done = true; + } } }; } @@ -270,7 +281,7 @@ public class WindowedWordCount implements StreamingApplication }, name("count words")) // Format the output and print out the result. - .map(new FormatAsTableRowFn(), name("FormatAsTableRowFn")).print(); + .map(new FormatAsTableRowFn(), name("FormatAsTableRowFn")).print(name("console")); wordCounts.endWith(collector, collector.input, name("Collector")).populateDag(dag); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java index 00c40e7..7ac6621 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java @@ -75,16 +75,11 @@ public class AutoComplete implements StreamingApplication */ public static class TweetsInput extends BaseOperator implements InputOperator { - private static boolean done = false; public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>(); + private boolean done; private transient BufferedReader reader; - public static boolean isDone() - { - return done; - } - @Override public void setup(OperatorContext context) { @@ -111,20 +106,21 @@ public class AutoComplete implements StreamingApplication @Override public void emitTuples() { - try { - String line = reader.readLine(); - if (line == null) { - done = true; - reader.close(); - Thread.sleep(1000); - } else { - this.output.emit(line); + if (!done) { + try { + String line = reader.readLine(); + if (line == null) { + done = true; + reader.close(); + } else { + this.output.emit(line); + } + Thread.sleep(50); + } catch (IOException ex) { + throw new RuntimeException(ex); + } catch (InterruptedException e) { + // Ignore it. } - Thread.sleep(50); - } catch (IOException ex) { - throw new RuntimeException(ex); - } catch (InterruptedException e) { - // Ignore it. } } } @@ -132,6 +128,19 @@ public class AutoComplete implements StreamingApplication public static class Collector extends BaseOperator { private static Map<String, List<CompletionCandidate>> result = new HashMap<>(); + private static boolean done = false; + + public static boolean isDone() + { + return done; + } + + @Override + public void setup(OperatorContext context) + { + super.setup(context); + done = false; + } public static Map<String, List<CompletionCandidate>> getResult() { @@ -143,6 +152,9 @@ public class AutoComplete implements StreamingApplication @Override public void process(Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>> tuple) { + if (tuple.getValue().getKey().equals("yarn")) { + done = true; + } result.put(tuple.getValue().getKey(), tuple.getValue().getValue()); } }; @@ -303,7 +315,8 @@ public class AutoComplete implements StreamingApplication .flatMap(new ExtractHashtags()); tags.window(windowOption, new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1)) - .addCompositeStreams(ComputeTopCompletions.top(10, true)).endWith(collector, collector.input, name("collector")) + .addCompositeStreams(ComputeTopCompletions.top(10, true)).print(name("console")) + .endWith(collector, collector.input, name("collector")) .populateDag(dag); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java index d7d62fe..a697d52 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java @@ -335,6 +335,7 @@ public class TopWikipediaSessions implements StreamingApplication Collector collector = new Collector(); StreamFactory.fromInput(sg, sg.output, name("sessionGen")) .addCompositeStreams(new ComputeTopSessions()) + .print(name("console")) .endWith(collector, collector.input, name("collector")).populateDag(dag); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java index 3045238..08aa8c8 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java @@ -514,7 +514,7 @@ public class TrafficRoutes implements StreamingApplication .addCompositeStreams(new TrackSpeed()) // print the result to console. - .print() + .print(name("console")) .endWith(collector, collector.input, name("Collector")) .populateDag(dag); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java index 7c16521..653207a 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java @@ -201,11 +201,6 @@ public class CombinePerKeyExamples implements StreamingApplication private String[] corpuses = new String[]{"1", "2", "3", "4", "5", "6", "7", "8"}; private static int i; - public static int getI() - { - return i; - } - @Override public void setup(Context.OperatorContext context) { @@ -219,10 +214,10 @@ public class CombinePerKeyExamples implements StreamingApplication while (i < 1) { for (String word : words) { for (String corpus : corpuses) { - beanOutput.emit(new SampleBean(word, corpus)); try { - Thread.sleep(100); - } catch (InterruptedException e) { + Thread.sleep(50); + beanOutput.emit(new SampleBean(word, corpus)); + } catch (Exception e) { // Ignore it } } @@ -235,12 +230,24 @@ public class CombinePerKeyExamples implements StreamingApplication public static class Collector extends BaseOperator { - static List<SampleBean> result; + private static List<SampleBean> result; + private static boolean done = false; + + public static List<SampleBean> getResult() + { + return result; + } + + public static boolean isDone() + { + return done; + } @Override public void setup(Context.OperatorContext context) { result = new ArrayList<>(); + done = false; } public final transient DefaultInputPort<SampleBean> input = new DefaultInputPort<SampleBean>() @@ -248,6 +255,9 @@ public class CombinePerKeyExamples implements StreamingApplication @Override public void process(SampleBean tuple) { + if (tuple.getWord().equals("F")) { + done = true; + } result.add(tuple); } }; @@ -265,7 +275,7 @@ public class CombinePerKeyExamples implements StreamingApplication Collector collector = new Collector(); StreamFactory.fromInput(input, input.beanOutput, name("input")) .addCompositeStreams(new PlaysForWord()) - .print() + .print(name("console")) .endWith(collector, collector.input, name("Collector")) .populateDag(dag); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java index 0cd7c58..d13e2c3 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java @@ -117,8 +117,9 @@ public class DeDupExample implements StreamingApplication new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(1))) // Remove the duplicate words and print out the result. - .accumulate(new RemoveDuplicates<String>(), name("RemoveDuplicates")).print().endWith(collector, collector.input) - + .accumulate(new RemoveDuplicates<String>(), name("RemoveDuplicates")) + .print(name("console")) + .endWith(collector, collector.input) .populateDag(dag); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java index d32da72..c078683 100644 --- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java +++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java @@ -37,7 +37,7 @@ public class MinimalWordCountTest { LocalMode lma = LocalMode.newInstance(); Configuration conf = new Configuration(false); - + conf.set("dt.application.MinimalWordCount.operator.console.silent", "true"); MinimalWordCount app = new MinimalWordCount(); lma.prepareDAG(app, conf); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java index f6270d4..f0c51f6 100644 --- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java +++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java @@ -46,6 +46,7 @@ public class WindowedWordCountTest { LocalMode lma = LocalMode.newInstance(); Configuration conf = new Configuration(false); + conf.set("dt.application.WindowedWordCount.operator.console.silent", "true"); lma.prepareDAG(new WindowedWordCount(), conf); LocalMode.Controller lc = lma.getController(); ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() @@ -53,7 +54,7 @@ public class WindowedWordCountTest @Override public Boolean call() throws Exception { - return WindowedWordCount.TextInput.isDone(); + return WindowedWordCount.Collector.isDone(); } }); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java index 97c5ad4..4ed2d5d 100644 --- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java +++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java @@ -39,6 +39,7 @@ public class AutoCompleteTest { LocalMode lma = LocalMode.newInstance(); Configuration conf = new Configuration(false); + conf.set("dt.application.AutoComplete.operator.console.silent", "true"); lma.prepareDAG(new AutoComplete(), conf); LocalMode.Controller lc = lma.getController(); @@ -47,7 +48,7 @@ public class AutoCompleteTest @Override public Boolean call() throws Exception { - return AutoComplete.TweetsInput.isDone(); + return AutoComplete.Collector.isDone(); } }); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java index c0dbaf4..fddf511 100644 --- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java +++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java @@ -39,6 +39,7 @@ public class TopWikipediaSessionsTest { LocalMode lma = LocalMode.newInstance(); Configuration conf = new Configuration(false); + conf.set("dt.application.TopWikipediaSessions.operator.console.silent", "true"); lma.prepareDAG(new TopWikipediaSessions(), conf); LocalMode.Controller lc = lma.getController(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java index c532898..766fa60 100644 --- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java +++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java @@ -41,6 +41,7 @@ public class TrafficRoutesTest { LocalMode lma = LocalMode.newInstance(); Configuration conf = new Configuration(false); + conf.set("dt.application.TrafficRoutes.operator.console.silent", "true"); lma.prepareDAG(new TrafficRoutes(), conf); LocalMode.Controller lc = lma.getController(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java index b130808..1e14fff 100644 --- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java +++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java @@ -35,7 +35,7 @@ public class CombinePerKeyExamplesTest { LocalMode lma = LocalMode.newInstance(); Configuration conf = new Configuration(false); - + conf.set("dt.application.CombinePerKeyExamples.operator.console.silent", "true"); CombinePerKeyExamples app = new CombinePerKeyExamples(); lma.prepareDAG(app, conf); @@ -46,11 +46,11 @@ public class CombinePerKeyExamplesTest @Override public Boolean call() throws Exception { - return CombinePerKeyExamples.SampleInput.getI() >= 1; + return CombinePerKeyExamples.Collector.isDone(); } }); lc.run(100000); - Assert.assertTrue(CombinePerKeyExamples.Collector.result.get(CombinePerKeyExamples.Collector.result.size() - 1).getCorpus().contains("1, 2, 3, 4, 5, 6, 7, 8")); + Assert.assertTrue(CombinePerKeyExamples.Collector.getResult().get(CombinePerKeyExamples.Collector.getResult().size() - 2).getCorpus().contains("1, 2, 3, 4, 5, 6, 7, 8")); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java index a175cd7..7f93f50 100644 --- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java +++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java @@ -38,7 +38,7 @@ public class DeDupExampleTest { LocalMode lma = LocalMode.newInstance(); Configuration conf = new Configuration(false); - + conf.set("dt.application.DeDupExample.operator.console.silent", "true"); DeDupExample app = new DeDupExample(); lma.prepareDAG(app, conf); LocalMode.Controller lc = lma.getController(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java index 6d44534..47f358f 100644 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java @@ -106,6 +106,12 @@ public interface ApexStream<T> * Add a stdout console output operator * @return stream itself */ + <STREAM extends ApexStream<T>> STREAM print(Option... opts); + + /** + * Add a stdout console output operator + * @return stream itself + */ <STREAM extends ApexStream<T>> STREAM print(); /** http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java index 032cb03..ba399de 100644 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java @@ -307,6 +307,15 @@ public class ApexStreamImpl<T> implements ApexStream<T> @Override @SuppressWarnings("unchecked") + public ApexStreamImpl<T> print(Option... opts) + { + ConsoleOutputOperator consoleOutputOperator = new ConsoleOutputOperator(); + addOperator(consoleOutputOperator, (Operator.InputPort<T>)consoleOutputOperator.input, null, opts); + return this; + } + + @Override + @SuppressWarnings("unchecked") public ApexStreamImpl<T> print() { ConsoleOutputOperator consoleOutputOperator = new ConsoleOutputOperator(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java ---------------------------------------------------------------------- diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java index f65806e..a39ff35 100644 --- a/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java +++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java @@ -35,6 +35,8 @@ import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.lib.util.KeyValPair; +import static org.apache.apex.malhar.stream.api.Option.Options.name; + /** * An application example with stream api */ @@ -56,7 +58,7 @@ public class ApplicationWithStreamAPI implements StreamingApplication return Arrays.asList(input.split("[\\p{Punct}\\s]+")); } }); - stream.print(); + stream.print(name("WordOutput")); stream.window(new WindowOption.GlobalWindow(), new TriggerOption().withEarlyFiringsAtEvery(Duration .millis(1000)).accumulatingFiredPanes()).countByKey(new Function.ToKeyValue<String, String, Long>() { @@ -65,7 +67,7 @@ public class ApplicationWithStreamAPI implements StreamingApplication { return new Tuple.PlainTuple(new KeyValPair<>(input, 1L)); } - }).print(); + }).print(name("WCOutput")); stream.populateDag(dag); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPITest.java ---------------------------------------------------------------------- diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPITest.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPITest.java index 70f26f2..29a2070 100644 --- a/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPITest.java +++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPITest.java @@ -42,6 +42,8 @@ public class ApplicationWithStreamAPITest { LocalMode lma = LocalMode.newInstance(); Configuration conf = new Configuration(false); + conf.set("dt.application.WordCountStreamingApiDemo.operator.WCOutput.silent", "true"); + conf.set("dt.application.WordCountStreamingApiDemo.operator.WordOutput.silent", "true"); lma.prepareDAG(new ApplicationWithStreamAPI(), conf); LocalMode.Controller lc = lma.getController(); long start = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java ---------------------------------------------------------------------- diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java index 5e48974..d912117 100644 --- a/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java +++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java @@ -162,7 +162,7 @@ public class MyStreamTest { return new Tuple.PlainTuple(new KeyValPair<>(input, 1L)); } - }).addOperator(collector, collector.inputPort, collector.outputPort).print().runEmbedded(false, 30000, exitCondition); + }).addOperator(collector, collector.inputPort, collector.outputPort).runEmbedded(false, 30000, exitCondition); Map<String, Long> dataMap = new HashMap<>();
