Merge commit 'refs/pull/480/head' of https://github.com/apache/apex-malhar
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/a017dfaa Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/a017dfaa Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/a017dfaa Branch: refs/heads/master Commit: a017dfaa45e74e270b4d5619f83994388ed6dc09 Parents: a1c319c 65488fd Author: Thomas Weise <[email protected]> Authored: Sun Mar 19 07:39:52 2017 -0700 Committer: Thomas Weise <[email protected]> Committed: Sun Mar 19 07:39:52 2017 -0700 ---------------------------------------------------------------------- .../malhar/stream/sample/MinimalWordCount.java | 2 +- .../malhar/stream/sample/WindowedWordCount.java | 2 +- .../stream/sample/complete/AutoComplete.java | 2 +- .../sample/complete/StreamingWordExtract.java | 2 +- .../sample/complete/TopWikipediaSessions.java | 2 +- .../stream/sample/complete/TrafficRoutes.java | 2 +- .../sample/complete/TwitterAutoComplete.java | 2 +- .../sample/cookbook/CombinePerKeyExamples.java | 2 +- .../stream/sample/cookbook/DeDupExample.java | 2 +- .../sample/cookbook/MaxPerKeyExamples.java | 2 +- .../stream/sample/cookbook/TriggerExample.java | 2 +- .../lib/function/AnnonymousClassModifier.java | 134 +++++++ .../apex/malhar/lib/function/Function.java | 87 +++++ .../malhar/lib/function/FunctionOperator.java | 378 +++++++++++++++++++ .../malhar/lib/utils/ByteArrayClassLoader.java | 54 +++ .../apache/apex/malhar/lib/utils/TupleUtil.java | 46 +++ .../apex/malhar/stream/api/ApexStream.java | 2 +- .../apex/malhar/stream/api/WindowedStream.java | 5 +- .../malhar/stream/api/function/Function.java | 88 ----- .../malhar/stream/api/impl/ApexStreamImpl.java | 6 +- .../stream/api/impl/ApexWindowedStreamImpl.java | 2 +- .../api/operator/AnnonymousClassModifier.java | 134 ------- .../api/operator/ByteArrayClassLoader.java | 54 --- .../stream/api/operator/FunctionOperator.java | 378 ------------------- .../apex/malhar/stream/api/util/TupleUtil.java | 46 --- .../FunctionOperator/FunctionOperatorTest.java | 4 +- .../stream/sample/ApplicationWithStreamAPI.java | 2 +- .../LocalTestWithoutStreamApplication.java | 2 +- .../apex/malhar/stream/sample/MyStream.java | 2 +- .../apex/malhar/stream/sample/MyStreamTest.java | 2 +- .../stream/sample/WordCountWithStreamAPI.java | 2 +- 31 files changed, 723 insertions(+), 727 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a017dfaa/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java ---------------------------------------------------------------------- diff --cc examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java index 327c882,0000000..160175f mode 100644,000000..100644 --- a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java +++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java @@@ -1,128 -1,0 +1,128 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + ++import org.apache.apex.malhar.lib.function.Function; +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.WindowOption; - import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.StreamFactory; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.util.KeyValPair; + +import static org.apache.apex.malhar.stream.api.Option.Options.name; + +/** + * Beam MinimalWordCount Example + * + * @since 3.5.0 + */ +@ApplicationAnnotation(name = "MinimalWordCount") +public class MinimalWordCount implements StreamingApplication +{ + public static class Collector extends BaseOperator + { + static Map<String, Long> result; + private static boolean done = false; + + public static boolean isDone() + { + return done; + } + + @Override + public void setup(Context.OperatorContext context) + { + done = false; + result = new HashMap<>(); + } + + public final transient DefaultInputPort<KeyValPair<String, Long>> input = new DefaultInputPort<KeyValPair<String, Long>>() + { + @Override + public void process(KeyValPair<String, Long> tuple) + { + if (tuple.getKey().equals("bye")) { + done = true; + } + result.put(tuple.getKey(), tuple.getValue()); + } + }; + } + + /** + * Populate the dag using High-Level API. + * @param dag + * @param conf + */ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + Collector collector = new Collector(); + // Create a stream reading from a file line by line using StreamFactory. + StreamFactory.fromFolder("./src/test/resources/wordcount", name("textInput")) + // Use a flatmap transformation to extract words from the incoming stream of lines. + .flatMap(new Function.FlatMapFunction<String, String>() + { + @Override + public Iterable<String> f(String input) + { + return Arrays.asList(input.split("[^a-zA-Z']+")); + + } + }, name("ExtractWords")) + // Apply windowing to the stream for counting, in this case, the window option is global window. + .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1)) + // Count the appearances of every word. + .countByKey(new Function.ToKeyValue<String, String, Long>() + { + @Override + public Tuple<KeyValPair<String, Long>> f(String input) + { + return new Tuple.PlainTuple<KeyValPair<String, Long>>(new KeyValPair<String, Long>(input, 1L)); + } + }, name("countByKey")) + // Format the counting result to a readable format by unwrapping the tuples. + .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, KeyValPair<String, Long>>() + { + @Override + public KeyValPair<String, Long> f(Tuple.WindowedTuple<KeyValPair<String, Long>> input) + { + return input.getValue(); + } + }, name("FormatResults")) + // Print the result. + .print(name("console")) + // Attach a collector to the stream to collect results. + .endWith(collector, collector.input, name("Collector")) + // populate the dag using the stream. + .populateDag(dag); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a017dfaa/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java ---------------------------------------------------------------------- diff --cc examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java index 5b83bd0,0000000..6e57bfd mode 100644,000000..100644 --- a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java +++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java @@@ -1,290 -1,0 +1,290 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import org.joda.time.Duration; ++import org.apache.apex.malhar.lib.function.Function; +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.stream.api.ApexStream; +import org.apache.apex.malhar.stream.api.WindowedStream; - import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.StreamFactory; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Throwables; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.util.KeyValPair; + +import static org.apache.apex.malhar.stream.api.Option.Options.name; + +/** + * Beam WindowedWordCount Example. + * + * @since 3.5.0 + */ +@ApplicationAnnotation(name = "WindowedWordCount") +public class WindowedWordCount implements StreamingApplication +{ + static final int WINDOW_SIZE = 1; // Default window duration in minutes + + /** + * A input operator that reads from and output a file line by line to downstream with a time gap between + * every two lines. + */ + public static class TextInput extends BaseOperator implements InputOperator + { + public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>(); + private boolean done = false; + + private transient BufferedReader reader; + + @Override + public void setup(Context.OperatorContext context) + { + done = false; + initReader(); + } + + private void initReader() + { + try { + InputStream resourceStream = this.getClass().getResourceAsStream("/wordcount/word.txt"); + reader = new BufferedReader(new InputStreamReader(resourceStream)); + } catch (Exception ex) { + throw Throwables.propagate(ex); + } + } + + @Override + public void teardown() + { + IOUtils.closeQuietly(reader); + } + + @Override + public void emitTuples() + { + if (!done) { + try { + String line = reader.readLine(); + if (line == null) { + done = true; + reader.close(); + } else { + this.output.emit(line); + } + Thread.sleep(50); + } catch (IOException ex) { + throw new RuntimeException(ex); + } catch (InterruptedException e) { + throw Throwables.propagate(e); + } + } + } + } + + public static class Collector extends BaseOperator + { + private static Map<KeyValPair<Long, String>, Long> result = new HashMap<>(); + private static boolean done = false; + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + done = false; + } + + public static boolean isDone() + { + return done; + } + + public static Map<KeyValPair<Long, String>, Long> getResult() + { + return result; + } + + public final transient DefaultInputPort<PojoEvent> input = new DefaultInputPort<PojoEvent>() + { + @Override + public void process(PojoEvent tuple) + { + result.put(new KeyValPair<Long, String>(tuple.getTimestamp(), tuple.getWord()), tuple.getCount()); + if (tuple.getWord().equals("bye")) { + done = true; + } + } + }; + } + + /** + * A Pojo Tuple class used for outputting result to JDBC. + */ + public static class PojoEvent + { + private String word; + private long count; + private long timestamp; + + @Override + public String toString() + { + return "PojoEvent (word=" + getWord() + ", count=" + getCount() + ", timestamp=" + getTimestamp() + ")"; + } + + public String getWord() + { + return word; + } + + public void setWord(String word) + { + this.word = word; + } + + public long getCount() + { + return count; + } + + public void setCount(long count) + { + this.count = count; + } + + public long getTimestamp() + { + return timestamp; + } + + public void setTimestamp(long timestamp) + { + this.timestamp = timestamp; + } + } + + /** + * A map function that wrap the input string with a random generated timestamp. + */ + public static class AddTimestampFn implements Function.MapFunction<String, Tuple.TimestampedTuple<String>> + { + private static final Duration RAND_RANGE = Duration.standardMinutes(10); + private final Long minTimestamp; + + AddTimestampFn() + { + this.minTimestamp = System.currentTimeMillis(); + } + + @Override + public Tuple.TimestampedTuple<String> f(String input) + { + // Generate a timestamp that falls somewhere in the past two hours. + long randMillis = (long)(Math.random() * RAND_RANGE.getMillis()); + long randomTimestamp = minTimestamp + randMillis; + + return new Tuple.TimestampedTuple<>(randomTimestamp, input); + } + } + + /** A MapFunction that converts a Word and Count into a PojoEvent. */ + public static class FormatAsTableRowFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, PojoEvent> + { + @Override + public PojoEvent f(Tuple.WindowedTuple<KeyValPair<String, Long>> input) + { + PojoEvent row = new PojoEvent(); + row.setTimestamp(input.getTimestamp()); + row.setCount(input.getValue().getValue()); + row.setWord(input.getValue().getKey()); + return row; + } + } + + /** + * Populate dag with High-Level API. + * @param dag + * @param conf + */ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + TextInput input = new TextInput(); + Collector collector = new Collector(); + + // Create stream from the TextInput operator. + ApexStream<Tuple.TimestampedTuple<String>> stream = StreamFactory.fromInput(input, input.output, name("input")) + + // Extract all the words from the input line of text. + .flatMap(new Function.FlatMapFunction<String, String>() + { + @Override + public Iterable<String> f(String input) + { + return Arrays.asList(input.split("[\\p{Punct}\\s]+")); + } + }, name("ExtractWords")) + + // Wrap the word with a randomly generated timestamp. + .map(new AddTimestampFn(), name("AddTimestampFn")); + + + // apply window and trigger option. + // TODO: change trigger option to atWaterMark when available. + WindowedStream<Tuple.TimestampedTuple<String>> windowedWords = stream + .window(new WindowOption.TimeWindows(Duration.standardMinutes(WINDOW_SIZE)), + new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1)); + + + WindowedStream<PojoEvent> wordCounts = + // Perform a countByKey transformation to count the appearance of each word in every time window. + windowedWords.countByKey(new Function.ToKeyValue<Tuple.TimestampedTuple<String>, String, Long>() + { + @Override + public Tuple<KeyValPair<String, Long>> f(Tuple.TimestampedTuple<String> input) + { + return new Tuple.TimestampedTuple<KeyValPair<String, Long>>(input.getTimestamp(), + new KeyValPair<String, Long>(input.getValue(), 1L)); + } + }, name("count words")) + + // Format the output and print out the result. + .map(new FormatAsTableRowFn(), name("FormatAsTableRowFn")).print(name("console")); + + wordCounts.endWith(collector, collector.input, name("Collector")).populateDag(dag); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a017dfaa/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java ---------------------------------------------------------------------- diff --cc examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java index 2db59b6,0000000..571a25f mode 100644,000000..100644 --- a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java +++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java @@@ -1,324 -1,0 +1,324 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.complete; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + ++import org.apache.apex.malhar.lib.function.Function; +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.stream.api.ApexStream; +import org.apache.apex.malhar.stream.api.CompositeStreamTransform; +import org.apache.apex.malhar.stream.api.WindowedStream; - import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.StreamFactory; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Throwables; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.util.KeyValPair; + +import static org.apache.apex.malhar.stream.api.Option.Options.name; + +/** + * An example that computes the most popular hash tags + * for every prefix, which can be used for auto-completion. + * This application is identical to TwitterAutoComplete, except it's + * reading from a file. This application is mainly for local testing + * purpose. + * + * <p>This will update the datastore every 10 seconds based on the last + * 30 minutes of data received. + * + * @since 3.5.0 + */ +@ApplicationAnnotation(name = "AutoComplete") +public class AutoComplete implements StreamingApplication +{ + + /** + * A dummy Twitter input operator. It reads from a text file containing some tweets and output a line every + * half of a second. + */ + public static class TweetsInput extends BaseOperator implements InputOperator + { + public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>(); + private boolean done; + + private transient BufferedReader reader; + + @Override + public void setup(OperatorContext context) + { + done = false; + initReader(); + } + + private void initReader() + { + try { + InputStream resourceStream = this.getClass().getResourceAsStream("/sampletweets.txt"); + reader = new BufferedReader(new InputStreamReader(resourceStream)); + } catch (Exception ex) { + throw Throwables.propagate(ex); + } + } + + @Override + public void teardown() + { + IOUtils.closeQuietly(reader); + } + + @Override + public void emitTuples() + { + if (!done) { + try { + String line = reader.readLine(); + if (line == null) { + done = true; + reader.close(); + } else { + this.output.emit(line); + } + Thread.sleep(50); + } catch (IOException ex) { + throw new RuntimeException(ex); + } catch (InterruptedException e) { + // Ignore it. + } + } + } + } + + public static class Collector extends BaseOperator + { + private static Map<String, List<CompletionCandidate>> result = new HashMap<>(); + private static boolean done = false; + + public static boolean isDone() + { + return done; + } + + @Override + public void setup(OperatorContext context) + { + super.setup(context); + done = false; + } + + public static Map<String, List<CompletionCandidate>> getResult() + { + return result; + } + + public final transient DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> input = new DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>() + { + @Override + public void process(Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>> tuple) + { + if (tuple.getValue().getKey().equals("yarn")) { + done = true; + } + result.put(tuple.getValue().getKey(), tuple.getValue().getValue()); + } + }; + } + + /** + * FlapMap Function to extract all hashtags from a text form tweet. + */ + private static class ExtractHashtags implements Function.FlatMapFunction<String, String> + { + + @Override + public Iterable<String> f(String input) + { + List<String> result = new LinkedList<>(); + Matcher m = Pattern.compile("#\\S+").matcher(input); + while (m.find()) { + result.add(m.group().substring(1)); + } + return result; + } + } + + /** + * Lower latency, but more expensive. + */ + private static class ComputeTopFlat + extends CompositeStreamTransform<WindowedStream<CompletionCandidate>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>> + { + private final int candidatesPerPrefix; + private final int minPrefix; + + public ComputeTopFlat(int candidatesPerPrefix, int minPrefix) + { + this.candidatesPerPrefix = candidatesPerPrefix; + this.minPrefix = minPrefix; + } + + @Override + public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose( + WindowedStream<CompletionCandidate> input) + { + return input + .<KeyValPair<String, CompletionCandidate>, WindowedStream<KeyValPair<String, CompletionCandidate>>>flatMap(new AllPrefixes(minPrefix)) + .accumulateByKey(new TopNByKey(), new Function.ToKeyValue<KeyValPair<String, CompletionCandidate>, String, + CompletionCandidate>() + { + @Override + public Tuple<KeyValPair<String, CompletionCandidate>> f(KeyValPair<String, CompletionCandidate> tuple) + { + // TODO: Should be removed after Auto-wrapping is supported. + return new Tuple.WindowedTuple<>(Window.GlobalWindow.INSTANCE, tuple); + } + }); + } + } + + /** + * FlapMap Function to extract all prefixes of the hashtag in the input CompletionCandidate, and output + * KeyValPairs of the prefix and the CompletionCandidate + */ + private static class AllPrefixes implements Function.FlatMapFunction<CompletionCandidate, KeyValPair<String, CompletionCandidate>> + { + private final int minPrefix; + private final int maxPrefix; + + public AllPrefixes() + { + this(0, Integer.MAX_VALUE); + } + + public AllPrefixes(int minPrefix) + { + this(minPrefix, Integer.MAX_VALUE); + } + + public AllPrefixes(int minPrefix, int maxPrefix) + { + this.minPrefix = minPrefix; + this.maxPrefix = maxPrefix; + } + + @Override + public Iterable<KeyValPair<String, CompletionCandidate>> f(CompletionCandidate input) + { + List<KeyValPair<String, CompletionCandidate>> result = new LinkedList<>(); + String word = input.getValue(); + for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) { + + result.add(new KeyValPair<>(input.getValue().substring(0, i).toLowerCase(), input)); + } + return result; + } + } + + /** + * A Composite stream transform that takes as input a list of tokens and returns + * the most common tokens per prefix. + */ + public static class ComputeTopCompletions + extends CompositeStreamTransform<WindowedStream<String>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>> + { + private final int candidatesPerPrefix; + private final boolean recursive; + + protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive) + { + this.candidatesPerPrefix = candidatesPerPrefix; + this.recursive = recursive; + } + + public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive) + { + return new ComputeTopCompletions(candidatesPerPrefix, recursive); + } + + @Override + @SuppressWarnings("unchecked") + public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(WindowedStream<String> inputStream) + { + ApexStream<CompletionCandidate> candidates = inputStream + .countByKey(new Function.ToKeyValue<String, String, Long>() + { + @Override + public Tuple<KeyValPair<String, Long>> f(String input) + { + return new Tuple.PlainTuple<>(new KeyValPair<>(input, 1L)); + } + }, name("countByKey")) + .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String,Long>>, CompletionCandidate>() + { + @Override + public CompletionCandidate f(Tuple.WindowedTuple<KeyValPair<String, Long>> input) + { + return new CompletionCandidate(input.getValue().getKey(), input.getValue().getValue()); + } + }, name("ToCompletionCandidate")); + + return candidates.addCompositeStreams(new ComputeTopFlat(10, 1)); + + } + } + + /** + * Populate the dag with High-Level API. + * @param dag + * @param conf + */ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + TweetsInput input = new TweetsInput(); + Collector collector = new Collector(); + + WindowOption windowOption = new WindowOption.GlobalWindow(); + + ApexStream<String> tags = StreamFactory.fromInput(input, input.output, name("tweetSampler")) + .flatMap(new ExtractHashtags()); + + tags.window(windowOption, new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1)) + .addCompositeStreams(ComputeTopCompletions.top(10, true)).print(name("console")) + .endWith(collector, collector.input, name("collector")) + .populateDag(dag); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a017dfaa/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java ---------------------------------------------------------------------- diff --cc examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java index 07f01d0,0000000..b5e491e mode 100644,000000..100644 --- a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java +++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java @@@ -1,162 -1,0 +1,162 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.complete; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + ++import org.apache.apex.malhar.lib.function.Function; +import org.apache.apex.malhar.stream.api.ApexStream; +import org.apache.apex.malhar.stream.api.Option; - import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.StreamFactory; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.db.jdbc.JdbcFieldInfo; +import com.datatorrent.lib.db.jdbc.JdbcPOJOInsertOutputOperator; +import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore; + +import static java.sql.Types.VARCHAR; + +/** + * Beam StreamingWordExtract Example. + * + * @since 3.5.0 + */ +@ApplicationAnnotation(name = "StreamingWordExtract") +public class StreamingWordExtract implements StreamingApplication +{ + private static int wordCount = 0; // A counter to count number of words have been extracted. + private static int entriesMapped = 0; // A counter to count number of entries have been mapped. + + public int getWordCount() + { + return wordCount; + } + + public int getEntriesMapped() + { + return entriesMapped; + } + + /** + * A MapFunction that tokenizes lines of text into individual words. + */ + public static class ExtractWords implements Function.FlatMapFunction<String, String> + { + @Override + public Iterable<String> f(String input) + { + List<String> result = new ArrayList<>(Arrays.asList(input.split("[^a-zA-Z0-9']+"))); + wordCount += result.size(); + return result; + } + } + + + /** + * A MapFunction that uppercases a word. + */ + public static class Uppercase implements Function.MapFunction<String, String> + { + @Override + public String f(String input) + { + return input.toUpperCase(); + } + } + + + /** + * A filter function to filter out empty strings. + */ + public static class EmptyStringFilter implements Function.FilterFunction<String> + { + @Override + public boolean f(String input) + { + return !input.isEmpty(); + } + } + + + /** + * A map function to map the result string to a pojo entry. + */ + public static class PojoMapper implements Function.MapFunction<String, Object> + { + + @Override + public Object f(String input) + { + PojoEvent pojo = new PojoEvent(); + pojo.setStringValue(input); + entriesMapped++; + return pojo; + } + } + + /** + * Add field infos to the {@link JdbcPOJOInsertOutputOperator}. + */ + private static List<JdbcFieldInfo> addFieldInfos() + { + List<JdbcFieldInfo> fieldInfos = new ArrayList<>(); + fieldInfos.add(new JdbcFieldInfo("STRINGVALUE", "stringValue", JdbcFieldInfo.SupportType.STRING, VARCHAR)); + return fieldInfos; + } + + /** + * Populate dag with High-Level API. + * @param dag + * @param conf + */ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + JdbcPOJOInsertOutputOperator jdbcOutput = new JdbcPOJOInsertOutputOperator(); + jdbcOutput.setFieldInfos(addFieldInfos()); + JdbcTransactionalStore outputStore = new JdbcTransactionalStore(); + jdbcOutput.setStore(outputStore); + jdbcOutput.setTablename("TestTable"); + + // Create a stream reading from a folder. + ApexStream<String> stream = StreamFactory.fromFolder("./src/test/resources/data"); + + // Extract all the words from the input line of text. + stream.flatMap(new ExtractWords()) + + // Filter out the empty strings. + .filter(new EmptyStringFilter()) + + // Change every word to uppercase. + .map(new Uppercase()) + + // Map the resulted word to a Pojo entry. + .map(new PojoMapper()) + + // Output the entries to JdbcOutput and insert them into a table. + .endWith(jdbcOutput, jdbcOutput.input, Option.Options.name("jdbcOutput")); + + stream.populateDag(dag); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a017dfaa/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java ---------------------------------------------------------------------- diff --cc examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java index 68ec733,0000000..b2b9ae4 mode 100644,000000..100644 --- a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java +++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java @@@ -1,347 -1,0 +1,347 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.complete; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Random; +import javax.annotation.Nullable; + +import org.joda.time.Duration; + ++import org.apache.apex.malhar.lib.function.Function; +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.lib.window.accumulation.TopN; +import org.apache.apex.malhar.stream.api.ApexStream; +import org.apache.apex.malhar.stream.api.CompositeStreamTransform; +import org.apache.apex.malhar.stream.api.WindowedStream; - import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.StreamFactory; +import org.apache.hadoop.conf.Configuration; + + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.util.KeyValPair; + +import static org.apache.apex.malhar.stream.api.Option.Options.name; + +/** + * Beam's TopWikipediaSessions Example. + * + * @since 3.5.0 + */ +@ApplicationAnnotation(name = "TopWikipediaSessions") +public class TopWikipediaSessions implements StreamingApplication +{ + /** + * A generator that outputs a stream of combinations of some users and some randomly generated edit time. + */ + public static class SessionGen extends BaseOperator implements InputOperator + { + private String[] names = new String[]{"user1", "user2", "user3", "user4"}; + public transient DefaultOutputPort<KeyValPair<String, Long>> output = new DefaultOutputPort<>(); + + private static final Duration RAND_RANGE = Duration.standardDays(365); + private Long minTimestamp; + private long sleepTime; + private static int tupleCount = 0; + + public static int getTupleCount() + { + return tupleCount; + } + + private String randomName(String[] names) + { + int index = new Random().nextInt(names.length); + return names[index]; + } + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + tupleCount = 0; + minTimestamp = System.currentTimeMillis(); + sleepTime = context.getValue(Context.OperatorContext.SPIN_MILLIS); + } + + @Override + public void emitTuples() + { + long randMillis = (long)(Math.random() * RAND_RANGE.getMillis()); + long randomTimestamp = minTimestamp + randMillis; + output.emit(new KeyValPair<String, Long>(randomName(names), randomTimestamp)); + tupleCount++; + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + // Ignore it. + } + } + } + + public static class Collector extends BaseOperator + { + private final int resultSize = 5; + private static List<List<TempWrapper>> result = new ArrayList<>(); + + public static List<List<TempWrapper>> getResult() + { + return result; + } + + public final transient DefaultInputPort<Tuple.WindowedTuple<List<TempWrapper>>> input = new DefaultInputPort<Tuple.WindowedTuple<List<TempWrapper>>>() + { + @Override + public void process(Tuple.WindowedTuple<List<TempWrapper>> tuple) + { + if (result.size() == resultSize) { + result.remove(0); + } + result.add(tuple.getValue()); + } + }; + } + + + /** + * Convert the upstream (user, time) combination to a timestamped tuple of user. + */ + static class ExtractUserAndTimestamp implements Function.MapFunction<KeyValPair<String, Long>, Tuple.TimestampedTuple<String>> + { + @Override + public Tuple.TimestampedTuple<String> f(KeyValPair<String, Long> input) + { + long timestamp = input.getValue(); + String userName = input.getKey(); + + // Sets the implicit timestamp field to be used in windowing. + return new Tuple.TimestampedTuple<>(timestamp, userName); + + } + } + + /** + * Computes the number of edits in each user session. A session is defined as + * a string of edits where each is separated from the next by less than an hour. + */ + static class ComputeSessions + extends CompositeStreamTransform<ApexStream<Tuple.TimestampedTuple<String>>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, Long>>>> + { + @Override + public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, Long>>> compose(ApexStream<Tuple.TimestampedTuple<String>> inputStream) + { + return inputStream + + // Chuck the stream into session windows. + .window(new WindowOption.SessionWindows(Duration.standardHours(1)), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1)) + + // Count the number of edits for a user within one session. + .countByKey(new Function.ToKeyValue<Tuple.TimestampedTuple<String>, String, Long>() + { + @Override + public Tuple.TimestampedTuple<KeyValPair<String, Long>> f(Tuple.TimestampedTuple<String> input) + { + return new Tuple.TimestampedTuple<KeyValPair<String, Long>>(input.getTimestamp(), new KeyValPair<String, Long>(input.getValue(), 1L)); + } + }, name("ComputeSessions")); + } + } + + /** + * A comparator class used for comparing two TempWrapper objects. + */ + public static class Comp implements Comparator<TempWrapper> + { + @Override + public int compare(TempWrapper o1, TempWrapper o2) + { + return Long.compare(o1.getValue().getValue(), o2.getValue().getValue()); + } + } + + /** + * A function to extract timestamp from a TempWrapper object. + */ + // TODO: Need to revisit and change back to using TimestampedTuple. + public static class TimestampExtractor implements com.google.common.base.Function<TempWrapper, Long> + { + @Override + public Long apply(@Nullable TempWrapper input) + { + return input.getTimestamp(); + } + } + + /** + * A temporary wrapper to wrap a KeyValPair and a timestamp together to represent a timestamped tuple, the reason + * for this is that we cannot resolve a type conflict when calling accumulate(). After the issue resolved, we can + * remove this class. + */ + public static class TempWrapper + { + private KeyValPair<String, Long> value; + private Long timestamp; + + public TempWrapper() + { + + } + + public TempWrapper(KeyValPair<String, Long> value, Long timestamp) + { + this.value = value; + this.timestamp = timestamp; + } + + @Override + public String toString() + { + return this.value + " - " + this.timestamp; + } + + public Long getTimestamp() + { + return timestamp; + } + + public void setTimestamp(Long timestamp) + { + this.timestamp = timestamp; + } + + public KeyValPair<String, Long> getValue() + { + return value; + } + + public void setValue(KeyValPair<String, Long> value) + { + this.value = value; + } + } + + /** + * Computes the longest session ending in each month, in this case we use 30 days to represent every month. + */ + private static class TopPerMonth + extends CompositeStreamTransform<ApexStream<Tuple.WindowedTuple<KeyValPair<String, Long>>>, WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>>> + { + + @Override + public WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>> compose(ApexStream<Tuple.WindowedTuple<KeyValPair<String, Long>>> inputStream) + { + TopN<TempWrapper> topN = new TopN<>(); + topN.setN(10); + topN.setComparator(new Comp()); + + return inputStream + + // Map the input WindowedTuple to a TempWrapper object. + .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, TempWrapper>() + { + @Override + public TempWrapper f(Tuple.WindowedTuple<KeyValPair<String, Long>> input) + { + Window window = input.getWindows().iterator().next(); + return new TempWrapper(input.getValue(), window.getBeginTimestamp()); + } + }, name("TempWrapper")) + + // Apply window and trigger option again, this time chuck the stream into fixed time windows. + .window(new WindowOption.TimeWindows(Duration.standardDays(30)), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(5))) + + // Compute the top 10 user-sessions with most number of edits. + .accumulate(topN, name("TopN")).with("timestampExtractor", new TimestampExtractor()); + } + } + + /** + * A map function that combine the user and his/her edit session together to a string and use that string as a key + * with number of edits in that session as value to create a new key value pair to send to downstream. + */ + static class SessionsToStringsDoFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, Tuple.WindowedTuple<KeyValPair<String, Long>>> + { + @Override + public Tuple.WindowedTuple<KeyValPair<String, Long>> f(Tuple.WindowedTuple<KeyValPair<String, Long>> input) + { + Window window = input.getWindows().iterator().next(); + return new Tuple.WindowedTuple<KeyValPair<String, Long>>(window, new KeyValPair<String, Long>( + input.getValue().getKey() + " : " + window.getBeginTimestamp() + " : " + window.getDurationMillis(), + input.getValue().getValue())); + } + } + + /** + * A flatmap function that turns the result into readable format. + */ + static class FormatOutputDoFn implements Function.FlatMapFunction<Tuple.WindowedTuple<List<TempWrapper>>, String> + { + @Override + public Iterable<String> f(Tuple.WindowedTuple<List<TempWrapper>> input) + { + ArrayList<String> result = new ArrayList<>(); + for (TempWrapper item : input.getValue()) { + String session = item.getValue().getKey(); + long count = item.getValue().getValue(); + Window window = input.getWindows().iterator().next(); + result.add(session + " + " + count + " : " + window.getBeginTimestamp()); + } + return result; + } + } + + /** + * A composite transform that compute the top wikipedia sessions. + */ + public static class ComputeTopSessions extends CompositeStreamTransform<ApexStream<KeyValPair<String, Long>>, WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>>> + { + @Override + public WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>> compose(ApexStream<KeyValPair<String, Long>> inputStream) + { + return inputStream + .map(new ExtractUserAndTimestamp(), name("ExtractUserAndTimestamp")) + .addCompositeStreams(new ComputeSessions()) + .map(new SessionsToStringsDoFn(), name("SessionsToStringsDoFn")) + .addCompositeStreams(new TopPerMonth()); + } + } + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + SessionGen sg = new SessionGen(); + Collector collector = new Collector(); + StreamFactory.fromInput(sg, sg.output, name("sessionGen")) + .addCompositeStreams(new ComputeTopSessions()) + .print(name("console")) + .endWith(collector, collector.input, name("collector")).populateDag(dag); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a017dfaa/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java ---------------------------------------------------------------------- diff --cc examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java index e6a53d6,0000000..431263a mode 100644,000000..100644 --- a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java +++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java @@@ -1,523 -1,0 +1,523 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.complete; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.List; +import java.util.Map; + +import javax.annotation.Nullable; + +import org.joda.time.Duration; + ++import org.apache.apex.malhar.lib.function.Function; +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.lib.window.accumulation.Group; +import org.apache.apex.malhar.stream.api.ApexStream; +import org.apache.apex.malhar.stream.api.CompositeStreamTransform; +import org.apache.apex.malhar.stream.api.WindowedStream; - import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.StreamFactory; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.Lists; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.util.KeyValPair; + +import static org.apache.apex.malhar.stream.api.Option.Options.name; + +/** + * Beam's TrafficRoutes example. + * + * @since 3.5.0 + */ +@ApplicationAnnotation(name = "TrafficRoutes") +public class TrafficRoutes implements StreamingApplication +{ + static Map<String, String> sdStations = buildStationInfo(); + static final int WINDOW_DURATION = 3; // Default sliding window duration in minutes + static final int WINDOW_SLIDE_EVERY = 1; // Default window 'slide every' setting in minutes + + /** + * This class holds information about a station reading's average speed. + */ + public static class StationSpeed implements Comparable<StationSpeed> + { + @Nullable + String stationId; + @Nullable + Double avgSpeed; + @Nullable + Long timestamp; + + public StationSpeed() {} + + public StationSpeed(String stationId, Double avgSpeed, Long timestamp) + { + this.stationId = stationId; + this.avgSpeed = avgSpeed; + this.timestamp = timestamp; + } + + public void setAvgSpeed(@Nullable Double avgSpeed) + { + this.avgSpeed = avgSpeed; + } + + public void setStationId(@Nullable String stationId) + { + this.stationId = stationId; + } + + public void setTimestamp(@Nullable Long timestamp) + { + this.timestamp = timestamp; + } + + @Nullable + public Long getTimestamp() + { + return timestamp; + } + + public String getStationId() + { + return this.stationId; + } + + public Double getAvgSpeed() + { + return this.avgSpeed; + } + + @Override + public int compareTo(StationSpeed other) + { + return Long.compare(this.timestamp, other.timestamp); + } + } + + /** + * This class holds information about a route's speed/slowdown. + */ + static class RouteInfo + { + @Nullable + String route; + @Nullable + Double avgSpeed; + @Nullable + Boolean slowdownEvent; + + public RouteInfo() + { + + } + + public RouteInfo(String route, Double avgSpeed, Boolean slowdownEvent) + { + this.route = route; + this.avgSpeed = avgSpeed; + this.slowdownEvent = slowdownEvent; + } + + public String getRoute() + { + return this.route; + } + + public Double getAvgSpeed() + { + return this.avgSpeed; + } + + public Boolean getSlowdownEvent() + { + return this.slowdownEvent; + } + } + + /** + * Extract the timestamp field from the input string, and wrap the input string in a {@link Tuple.TimestampedTuple} + * with the extracted timestamp. + */ + static class ExtractTimestamps implements Function.MapFunction<String, Tuple.TimestampedTuple<String>> + { + + @Override + public Tuple.TimestampedTuple<String> f(String input) + { + String[] items = input.split(","); + String timestamp = tryParseTimestamp(items); + + return new Tuple.TimestampedTuple<>(Long.parseLong(timestamp), input); + } + } + + /** + * Filter out readings for the stations along predefined 'routes', and output + * (station, speed info) keyed on route. + */ + static class ExtractStationSpeedFn implements Function.FlatMapFunction<Tuple.TimestampedTuple<String>, KeyValPair<String, StationSpeed>> + { + + @Override + public Iterable<KeyValPair<String, StationSpeed>> f(Tuple.TimestampedTuple<String> input) + { + + ArrayList<KeyValPair<String, StationSpeed>> result = new ArrayList<>(); + String[] items = input.getValue().split(","); + String stationType = tryParseStationType(items); + // For this analysis, use only 'main line' station types + if (stationType != null && stationType.equals("ML")) { + Double avgSpeed = tryParseAvgSpeed(items); + String stationId = tryParseStationId(items); + // For this simple example, filter out everything but some hardwired routes. + if (avgSpeed != null && stationId != null && sdStations.containsKey(stationId)) { + StationSpeed stationSpeed = + new StationSpeed(stationId, avgSpeed, input.getTimestamp()); + // The tuple key is the 'route' name stored in the 'sdStations' hash. + KeyValPair<String, StationSpeed> outputValue = new KeyValPair<>(sdStations.get(stationId), stationSpeed); + result.add(outputValue); + } + } + return result; + } + } + + /** + * For a given route, track average speed for the window. Calculate whether + * traffic is currently slowing down, via a predefined threshold. If a supermajority of + * speeds in this sliding window are less than the previous reading we call this a 'slowdown'. + * Note: these calculations are for example purposes only, and are unrealistic and oversimplified. + */ + static class GatherStats + implements Function.FlatMapFunction<Tuple.WindowedTuple<KeyValPair<String, List<StationSpeed>>>, Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> + { + @Override + public Iterable<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> f(Tuple.WindowedTuple<KeyValPair<String, List<StationSpeed>>> input) + { + ArrayList<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> result = new ArrayList<>(); + String route = input.getValue().getKey(); + double speedSum = 0.0; + int speedCount = 0; + int speedups = 0; + int slowdowns = 0; + List<StationSpeed> infoList = Lists.newArrayList(input.getValue().getValue()); + // StationSpeeds sort by embedded timestamp. + Collections.sort(infoList); + Map<String, Double> prevSpeeds = new HashMap<>(); + // For all stations in the route, sum (non-null) speeds. Keep a count of the non-null speeds. + for (StationSpeed item : infoList) { + Double speed = item.getAvgSpeed(); + if (speed != null) { + speedSum += speed; + speedCount++; + Double lastSpeed = prevSpeeds.get(item.getStationId()); + if (lastSpeed != null) { + if (lastSpeed < speed) { + speedups += 1; + } else { + slowdowns += 1; + } + } + prevSpeeds.put(item.getStationId(), speed); + } + } + if (speedCount == 0) { + // No average to compute. + return result; + } + double speedAvg = speedSum / speedCount; + boolean slowdownEvent = slowdowns >= 2 * speedups; + RouteInfo routeInfo = new RouteInfo(route, speedAvg, slowdownEvent); + result.add(new Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>(input.getTimestamp(), new KeyValPair<String, RouteInfo>(route, routeInfo))); + return result; + } + } + + /** + * Output Pojo class for outputting result to JDBC. + */ + static class OutputPojo + { + private Double avgSpeed; + private Boolean slowdownEvent; + private String key; + private Long timestamp; + + public OutputPojo() + { + } + + public OutputPojo(Double avgSpeed, Boolean slowdownEvent, String key, Long timestamp) + { + this.avgSpeed = avgSpeed; + this.slowdownEvent = slowdownEvent; + this.key = key; + this.timestamp = timestamp; + } + + @Override + public String toString() + { + return key + " + " + avgSpeed + " + " + slowdownEvent + " + " + timestamp; + } + + public void setTimestamp(Long timestamp) + { + this.timestamp = timestamp; + } + + public Long getTimestamp() + { + return timestamp; + } + + public void setAvgSpeed(Double avgSpeed) + { + this.avgSpeed = avgSpeed; + } + + public Double getAvgSpeed() + { + return avgSpeed; + } + + public void setKey(String key) + { + this.key = key; + } + + public String getKey() + { + return key; + } + + public void setSlowdownEvent(Boolean slowdownEvent) + { + this.slowdownEvent = slowdownEvent; + } + + public Boolean getSlowdownEvent() + { + return slowdownEvent; + } + + } + + public static class Collector extends BaseOperator + { + private static Map<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> result = new HashMap<>(); + + public static Map<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> getResult() + { + return result; + } + + public final transient DefaultInputPort<OutputPojo> input = new DefaultInputPort<OutputPojo>() + { + @Override + public void process(OutputPojo tuple) + { + result.put(new KeyValPair<Long, String>(tuple.getTimestamp(), tuple.getKey()), new KeyValPair<Double, Boolean>(tuple.getAvgSpeed(), tuple.getSlowdownEvent())); + } + }; + } + + /** + * Format the results of the slowdown calculations to a OutputPojo. + */ + static class FormatStatsFn implements Function.MapFunction<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>, OutputPojo> + { + @Override + public OutputPojo f(Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>> input) + { + RouteInfo routeInfo = input.getValue().getValue(); + OutputPojo row = new OutputPojo(routeInfo.getAvgSpeed(), routeInfo.getSlowdownEvent(), input.getValue().getKey(), input.getTimestamp()); + return row; + } + } + + + /** + * This composite transformation extracts speed info from traffic station readings. + * It groups the readings by 'route' and analyzes traffic slowdown for that route. + * Lastly, it formats the results for JDBC. + */ + static class TrackSpeed extends + CompositeStreamTransform<WindowedStream<KeyValPair<String, StationSpeed>>, WindowedStream<OutputPojo>> + { + @Override + public WindowedStream<OutputPojo> compose(WindowedStream<KeyValPair<String, StationSpeed>> inputStream) + { + // Apply a GroupByKey transform to collect a list of all station + // readings for a given route. + WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<StationSpeed>>>> timeGroup = + inputStream + .accumulateByKey(new Group<StationSpeed>(), new Function.ToKeyValue<KeyValPair<String, StationSpeed>, String, StationSpeed>() + { + @Override + public Tuple<KeyValPair<String, StationSpeed>> f(KeyValPair<String, StationSpeed> input) + { + return new Tuple.TimestampedTuple<>(input.getValue().getTimestamp(), input); + } + }, name("GroupByKey")); + + // Analyze 'slowdown' over the route readings. + WindowedStream<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> stats = timeGroup + .flatMap(new GatherStats(), name("GatherStats")); + + // Format the results for writing to JDBC table. + WindowedStream<OutputPojo> results = stats.map(new FormatStatsFn(), name("FormatStatsFn")); + + return results; + } + } + + + private static Double tryParseAvgSpeed(String[] inputItems) + { + try { + return Double.parseDouble(tryParseString(inputItems, 3)); + } catch (NumberFormatException e) { + return null; + } catch (NullPointerException e) { + return null; + } + } + + private static String tryParseStationType(String[] inputItems) + { + return tryParseString(inputItems, 2); + } + + private static String tryParseStationId(String[] inputItems) + { + return tryParseString(inputItems, 1); + } + + private static String tryParseTimestamp(String[] inputItems) + { + return tryParseString(inputItems, 0); + } + + private static String tryParseString(String[] inputItems, int index) + { + return inputItems.length >= index ? inputItems[index] : null; + } + + /** + * Define some small hard-wired San Diego 'routes' to track based on sensor station ID. + */ + private static Map<String, String> buildStationInfo() + { + Map<String, String> stations = new Hashtable<String, String>(); + stations.put("1108413", "SDRoute1"); // from freeway 805 S + stations.put("1108699", "SDRoute2"); // from freeway 78 E + stations.put("1108702", "SDRoute2"); + return stations; + } + + /** + * A dummy generator to generate some traffic information. + */ + public static class InfoGen extends BaseOperator implements InputOperator + { + public transient DefaultOutputPort<String> output = new DefaultOutputPort<>(); + + private String[] stationTypes = new String[]{"ML", "BL", "GL"}; + private int[] stationIDs = new int[]{1108413, 1108699, 1108702}; + private double ave = 55.0; + private long timestamp; + private static final Duration RAND_RANGE = Duration.standardMinutes(10); + private static int tupleCount = 0; + + public static int getTupleCount() + { + return tupleCount; + } + + @Override + public void setup(Context.OperatorContext context) + { + tupleCount = 0; + timestamp = System.currentTimeMillis(); + } + + @Override + public void emitTuples() + { + for (String stationType : stationTypes) { + for (int stationID : stationIDs) { + double speed = Math.random() * 20 + ave; + long time = (long)(Math.random() * RAND_RANGE.getMillis()) + timestamp; + try { + output.emit(time + "," + stationID + "," + stationType + "," + speed); + tupleCount++; + + Thread.sleep(50); + } catch (Exception e) { + // Ignore it + } + } + } + } + } + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + InfoGen infoGen = new InfoGen(); + Collector collector = new Collector(); + + // Create a stream from the input operator. + ApexStream<Tuple.TimestampedTuple<String>> stream = StreamFactory.fromInput(infoGen, infoGen.output, name("infoGen")) + + // Extract the timestamp from the input and wrap it into a TimestampedTuple. + .map(new ExtractTimestamps(), name("ExtractTimestamps")); + + stream + // Extract the average speed of a station. + .flatMap(new ExtractStationSpeedFn(), name("ExtractStationSpeedFn")) + + // Apply window and trigger option. + .window(new WindowOption.SlidingTimeWindows(Duration.standardMinutes(WINDOW_DURATION), Duration.standardMinutes(WINDOW_SLIDE_EVERY)), new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(5000)).accumulatingFiredPanes()) + + // Apply TrackSpeed composite transformation to compute the route information. + .addCompositeStreams(new TrackSpeed()) + + // print the result to console. + .print(name("console")) + .endWith(collector, collector.input, name("Collector")) + .populateDag(dag); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a017dfaa/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java ---------------------------------------------------------------------- diff --cc examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java index 6332c66,0000000..cf52cb6 mode 100644,000000..100644 --- a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java +++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java @@@ -1,254 -1,0 +1,254 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.complete; + +import java.nio.charset.Charset; +import java.nio.charset.CharsetEncoder; + +import java.util.LinkedList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.joda.time.Duration; + ++import org.apache.apex.malhar.lib.function.Function; +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.apex.malhar.lib.window.WindowOption; + +import org.apache.apex.malhar.stream.api.ApexStream; +import org.apache.apex.malhar.stream.api.CompositeStreamTransform; +import org.apache.apex.malhar.stream.api.WindowedStream; - import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.StreamFactory; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.contrib.twitter.TwitterSampleInput; +import com.datatorrent.lib.util.KeyValPair; + +import static org.apache.apex.malhar.stream.api.Option.Options.name; + +/** + * Auto Complete Hashtag Example with real time twitter input. In order to run this application, you need to create an app + * at https://apps.twitter.com, then generate your consumer and access keys and tokens, and enter those information + * accordingly in /resources/META-INF/properties.xml. + * + * The authentication requires following 4 information. + * Your application consumer key, + * Your application consumer secret, + * Your twitter access token, and + * Your twitter access token secret. + * + * @since 3.5.0 + */ +@ApplicationAnnotation(name = "TwitterAutoComplete") +public class TwitterAutoComplete implements StreamingApplication +{ + /** + * Check whether every character in a string is ASCII encoding. + */ + public static class StringUtils + { + static CharsetEncoder encoder = Charset.forName("US-ASCII").newEncoder(); + + public static boolean isAscii(String v) + { + return encoder.canEncode(v); + } + } + + /** + * FlapMap Function to extract all hashtags from a text form tweet. + */ + private static class ExtractHashtags implements Function.FlatMapFunction<String, String> + { + + @Override + public Iterable<String> f(String input) + { + List<String> result = new LinkedList<>(); + Matcher m = Pattern.compile("#\\S+").matcher(input); + while (m.find()) { + result.add(m.group().substring(1)); + } + return result; + } + } + + /** + * Lower latency, but more expensive. + */ + private static class ComputeTopFlat + extends CompositeStreamTransform<WindowedStream<CompletionCandidate>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>> + { + private final int candidatesPerPrefix; + private final int minPrefix; + + public ComputeTopFlat(int candidatesPerPrefix, int minPrefix) + { + this.candidatesPerPrefix = candidatesPerPrefix; + this.minPrefix = minPrefix; + } + + @Override + public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose( + WindowedStream<CompletionCandidate> input) + { + TopNByKey topNByKey = new TopNByKey(); + topNByKey.setN(candidatesPerPrefix); + return input + .<KeyValPair<String, CompletionCandidate>, WindowedStream<KeyValPair<String, CompletionCandidate>>>flatMap(new AllPrefixes(minPrefix, 3), name("Extract Prefixes")) + .accumulateByKey(topNByKey, new Function.ToKeyValue<KeyValPair<String, CompletionCandidate>, String, CompletionCandidate>() + { + @Override + public Tuple<KeyValPair<String, CompletionCandidate>> f(KeyValPair<String, CompletionCandidate> tuple) + { + // TODO: Should be removed after Auto-wrapping is supported. + return new Tuple.WindowedTuple<>(Window.GlobalWindow.INSTANCE, tuple); + } + }, name("TopNByKey")); + } + } + + /** + * FlapMap Function to extract all prefixes of the hashtag in the input CompletionCandidate, and output + * KeyValPairs of the prefix and the CompletionCandidate + */ + private static class AllPrefixes implements Function.FlatMapFunction<CompletionCandidate, KeyValPair<String, CompletionCandidate>> + { + private final int minPrefix; + private final int maxPrefix; + + public AllPrefixes() + { + this(0, Integer.MAX_VALUE); + } + + public AllPrefixes(int minPrefix) + { + this(minPrefix, Integer.MAX_VALUE); + } + + public AllPrefixes(int minPrefix, int maxPrefix) + { + this.minPrefix = minPrefix; + this.maxPrefix = maxPrefix; + } + + @Override + public Iterable<KeyValPair<String, CompletionCandidate>> f(CompletionCandidate input) + { + List<KeyValPair<String, CompletionCandidate>> result = new LinkedList<>(); + String word = input.getValue(); + for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) { + result.add(new KeyValPair<>(input.getValue().substring(0, i).toLowerCase(), input)); + } + return result; + } + } + + /** + * A Composite stream transform that takes as input a list of tokens and returns + * the most common tokens per prefix. + */ + public static class ComputeTopCompletions + extends CompositeStreamTransform<WindowedStream<String>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>> + { + private final int candidatesPerPrefix; + private final boolean recursive; + + protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive) + { + this.candidatesPerPrefix = candidatesPerPrefix; + this.recursive = recursive; + } + + public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive) + { + return new ComputeTopCompletions(candidatesPerPrefix, recursive); + } + + @Override + @SuppressWarnings("unchecked") + public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(WindowedStream<String> inputStream) + { + + ApexStream<CompletionCandidate> candidates = inputStream + .countByKey(new Function.ToKeyValue<String, String, Long>() + { + @Override + public Tuple<KeyValPair<String, Long>> f(String input) + { + return new Tuple.PlainTuple<>(new KeyValPair<>(input, 1L)); + } + }, name("Hashtag Count")) + .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String,Long>>, CompletionCandidate>() + { + @Override + public CompletionCandidate f(Tuple.WindowedTuple<KeyValPair<String, Long>> input) + { + return new CompletionCandidate(input.getValue().getKey(), input.getValue().getValue()); + } + }, name("KeyValPair to CompletionCandidate")); + + return candidates.addCompositeStreams(new ComputeTopFlat(candidatesPerPrefix, 1)); + + } + } + + /** + * FilterFunction to filter out tweets with non-acsii characters. + */ + static class ASCIIFilter implements Function.FilterFunction<String> + { + @Override + public boolean f(String input) + { + return StringUtils.isAscii(input); + } + } + + /** + * Populate the dag with High-Level API. + * @param dag + * @param conf + */ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + TwitterSampleInput input = new TwitterSampleInput(); + + WindowOption windowOption = new WindowOption.GlobalWindow(); + + ApexStream<String> tags = StreamFactory.fromInput(input, input.text, name("tweetSampler")) + .filter(new ASCIIFilter(), name("ACSII Filter")) + .flatMap(new ExtractHashtags(), name("Extract Hashtags")); + + ApexStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> s = + tags.window(windowOption, new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(10))) + .addCompositeStreams(ComputeTopCompletions.top(10, true)).print(); + + s.populateDag(dag); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a017dfaa/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java ---------------------------------------------------------------------- diff --cc examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java index bfdb268,0000000..937476e mode 100644,000000..100644 --- a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java +++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java @@@ -1,285 -1,0 +1,285 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.cookbook; + +import java.util.ArrayList; +import java.util.List; + ++import org.apache.apex.malhar.lib.function.Function; +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.lib.window.accumulation.ReduceFn; +import org.apache.apex.malhar.stream.api.ApexStream; +import org.apache.apex.malhar.stream.api.CompositeStreamTransform; +import org.apache.apex.malhar.stream.api.WindowedStream; - import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.StreamFactory; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.util.KeyValPair; + +import static org.apache.apex.malhar.stream.api.Option.Options.name; + +/** + * An example that reads the public 'Shakespeare' data, and for each word in + * the dataset that is over a given length, generates a string containing the + * list of play names in which that word appears + * + * <p>Concepts: the combine transform, which lets you combine the values in a + * key-grouped Collection + * + * + * @since 3.5.0 + */ +@ApplicationAnnotation(name = "CombinePerKeyExamples") +public class CombinePerKeyExamples implements StreamingApplication +{ + // Use the shakespeare public BigQuery sample + private static final String SHAKESPEARE_TABLE = "publicdata:samples.shakespeare"; + // We'll track words >= this word length across all plays in the table. + private static final int MIN_WORD_LENGTH = 0; + + /** + * Examines each row in the input table. If the word is greater than or equal to MIN_WORD_LENGTH, + * outputs word, play_name. + */ + static class ExtractLargeWordsFn implements Function.MapFunction<SampleBean, KeyValPair<String, String>> + { + + @Override + public KeyValPair<String, String> f(SampleBean input) + { + String playName = input.getCorpus(); + String word = input.getWord(); + if (word.length() >= MIN_WORD_LENGTH) { + return new KeyValPair<>(word, playName); + } else { + return null; + } + } + } + + + /** + * Prepares the output data which is in same bean + */ + static class FormatShakespeareOutputFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, String>>, SampleBean> + { + @Override + public SampleBean f(Tuple.WindowedTuple<KeyValPair<String, String>> input) + { + return new SampleBean(input.getValue().getKey(), input.getValue().getValue()); + } + } + + /** + * A reduce function to concat two strings together. + */ + public static class Concat extends ReduceFn<String> + { + @Override + public String reduce(String input1, String input2) + { + return input1 + ", " + input2; + } + } + + /** + * Reads the public 'Shakespeare' data, and for each word in the dataset + * over a given length, generates a string containing the list of play names + * in which that word appears. + */ + private static class PlaysForWord extends CompositeStreamTransform<ApexStream<SampleBean>, WindowedStream<SampleBean>> + { + + @Override + public WindowedStream<SampleBean> compose(ApexStream<SampleBean> inputStream) + { + return inputStream + // Extract words from the input SampleBeam stream. + .map(new ExtractLargeWordsFn(), name("ExtractLargeWordsFn")) + + // Apply window and trigger option to the streams. + .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1)) + + // Apply reduceByKey transformation to concat the names of all the plays that a word has appeared in together. + .reduceByKey(new Concat(), new Function.ToKeyValue<KeyValPair<String,String>, String, String>() + { + @Override + public Tuple<KeyValPair<String, String>> f(KeyValPair<String, String> input) + { + return new Tuple.PlainTuple<KeyValPair<String, String>>(input); + } + }, name("Concat")) + + // Format the output back to a SampleBeam object. + .map(new FormatShakespeareOutputFn(), name("FormatShakespeareOutputFn")); + } + } + + + /** + * A Java Beam class that contains information about a word appears in a corpus written by Shakespeare. + */ + public static class SampleBean + { + + public SampleBean() + { + + } + + public SampleBean(String word, String corpus) + { + this.word = word; + this.corpus = corpus; + } + + @Override + public String toString() + { + return this.word + " : " + this.corpus; + } + + private String word; + + private String corpus; + + public void setWord(String word) + { + this.word = word; + } + + public String getWord() + { + return word; + } + + public void setCorpus(String corpus) + { + this.corpus = corpus; + } + + public String getCorpus() + { + return corpus; + } + } + + /** + * A dummy info generator to generate {@link SampleBean} objects to mimic reading from real 'Shakespeare' + * data. + */ + public static class SampleInput extends BaseOperator implements InputOperator + { + + public final transient DefaultOutputPort<SampleBean> beanOutput = new DefaultOutputPort(); + private String[] words = new String[]{"A", "B", "C", "D", "E", "F", "G"}; + private String[] corpuses = new String[]{"1", "2", "3", "4", "5", "6", "7", "8"}; + private static int i; + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + i = 0; + } + + @Override + public void emitTuples() + { + while (i < 1) { + for (String word : words) { + for (String corpus : corpuses) { + try { + Thread.sleep(50); + beanOutput.emit(new SampleBean(word, corpus)); + } catch (Exception e) { + // Ignore it + } + } + } + i++; + } + + } + } + + public static class Collector extends BaseOperator + { + private static List<SampleBean> result; + private static boolean done = false; + + public static List<SampleBean> getResult() + { + return result; + } + + public static boolean isDone() + { + return done; + } + + @Override + public void setup(Context.OperatorContext context) + { + result = new ArrayList<>(); + done = false; + } + + public final transient DefaultInputPort<SampleBean> input = new DefaultInputPort<SampleBean>() + { + @Override + public void process(SampleBean tuple) + { + if (tuple.getWord().equals("F")) { + done = true; + } + result.add(tuple); + } + }; + } + + /** + * Populate dag using High-Level API. + * @param dag + * @param conf + */ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + SampleInput input = new SampleInput(); + Collector collector = new Collector(); + StreamFactory.fromInput(input, input.beanOutput, name("input")) + .addCompositeStreams(new PlaysForWord()) + .print(name("console")) + .endWith(collector, collector.input, name("Collector")) + .populateDag(dag); + + } +}
