Added Beam Examples and Implementations of Accumulation.
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/dcca7752 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/dcca7752 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/dcca7752 Branch: refs/heads/master Commit: dcca7752a8ee966d67602a1b7cb8fbacdb8ed59d Parents: 266b041 Author: Shunxin <[email protected]> Authored: Wed Aug 24 13:12:20 2016 -0700 Committer: Siyuan Hua <[email protected]> Committed: Thu Aug 25 09:26:03 2016 -0700 ---------------------------------------------------------------------- demos/highlevelapi/pom.xml | 98 +++- .../malhar/stream/sample/MinimalWordCount.java | 126 +++++ .../malhar/stream/sample/WindowedWordCount.java | 277 ++++++++++ .../stream/sample/complete/AutoComplete.java | 308 +++++++++++ .../sample/complete/CompletionCandidate.java | 87 ++++ .../stream/sample/complete/PojoEvent.java | 44 ++ .../sample/complete/StreamingWordExtract.java | 160 ++++++ .../stream/sample/complete/TopNByKey.java | 118 +++++ .../sample/complete/TopWikipediaSessions.java | 340 ++++++++++++ .../stream/sample/complete/TrafficRoutes.java | 521 +++++++++++++++++++ .../sample/complete/TwitterAutoComplete.java | 251 +++++++++ .../sample/cookbook/CombinePerKeyExamples.java | 212 ++++---- .../stream/sample/cookbook/DeDupExample.java | 124 +++++ .../stream/sample/cookbook/InputPojo.java | 76 +++ .../sample/cookbook/MaxPerKeyExamples.java | 203 ++++++++ .../stream/sample/cookbook/OutputPojo.java | 54 ++ .../stream/sample/cookbook/TriggerExample.java | 137 +++-- .../src/main/resources/META-INF/properties.xml | 141 +++++ .../stream/sample/MinimalWordCountTest.java | 61 +++ .../stream/sample/WindowedWordCountTest.java | 90 ++++ .../sample/complete/AutoCompleteTest.java | 66 +++ .../complete/StreamingWordExtractTest.java | 144 +++++ .../complete/TopWikipediaSessionsTest.java | 73 +++ .../sample/complete/TrafficRoutesTest.java | 66 +++ .../complete/TwitterAutoCompleteTest.java | 66 +++ .../cookbook/CombinePerKeyExamplesTest.java | 56 ++ .../sample/cookbook/DeDupExampleTest.java | 59 +++ .../sample/cookbook/MaxPerKeyExamplesTest.java | 210 ++++++++ .../src/test/resources/data/word.txt | 2 + .../src/test/resources/log4j.properties | 45 ++ .../src/test/resources/sampletweets.txt | 207 ++++++++ .../src/test/resources/wordcount/word.txt | 8 + demos/pom.xml | 13 +- .../lib/window/impl/accumulation/Average.java | 64 +++ .../lib/window/impl/accumulation/Count.java | 61 +++ .../lib/window/impl/accumulation/FoldFn.java | 65 +++ .../lib/window/impl/accumulation/Group.java | 63 +++ .../lib/window/impl/accumulation/Max.java | 75 +++ .../lib/window/impl/accumulation/Min.java | 76 +++ .../lib/window/impl/accumulation/ReduceFn.java | 65 +++ .../impl/accumulation/RemoveDuplicates.java | 72 +++ .../lib/window/impl/accumulation/SumDouble.java | 60 +++ .../lib/window/impl/accumulation/SumFloat.java | 60 +++ .../lib/window/impl/accumulation/SumInt.java | 60 +++ .../lib/window/impl/accumulation/SumLong.java | 60 +++ .../lib/window/impl/accumulation/TopN.java | 106 ++++ .../lib/window/impl/accumulation/TopNByKey.java | 114 ++++ .../window/impl/accumulation/AverageTest.java | 41 ++ .../window/impl/accumulation/FoldFnTest.java | 129 +++++ .../lib/window/impl/accumulation/GroupTest.java | 42 ++ .../lib/window/impl/accumulation/MaxTest.java | 53 ++ .../lib/window/impl/accumulation/MinTest.java | 53 ++ .../window/impl/accumulation/ReduceFnTest.java | 50 ++ .../impl/accumulation/RemoveDuplicatesTest.java | 42 ++ .../lib/window/impl/accumulation/SumTest.java | 57 ++ .../window/impl/accumulation/TopNByKeyTest.java | 75 +++ .../apex/malhar/stream/api/WindowedStream.java | 4 +- .../stream/api/impl/ApexWindowedStreamImpl.java | 11 +- .../stream/api/impl/accumulation/Count.java | 61 --- .../stream/api/impl/accumulation/FoldFn.java | 65 --- .../stream/api/impl/accumulation/ReduceFn.java | 65 --- .../stream/api/impl/accumulation/TopN.java | 107 ---- stream/src/test/resources/words/word.txt | 2 + 63 files changed, 5820 insertions(+), 481 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/pom.xml ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/pom.xml b/demos/highlevelapi/pom.xml index c669681..cde0c83 100644 --- a/demos/highlevelapi/pom.xml +++ b/demos/highlevelapi/pom.xml @@ -34,21 +34,107 @@ <version>3.5.0-SNAPSHOT</version> </parent> - <properties> - <skipTests>true</skipTests> - </properties> + <build> + <plugins> + + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.9.1</version> + <executions> + <execution> + <id>attach-artifacts</id> + <phase>package</phase> + <goals> + <goal>attach-artifact</goal> + </goals> + <configuration> + <artifacts> + <artifact> + <file>target/${project.artifactId}-${project.version}.apa</file> + <type>apa</type> + </artifact> + </artifacts> + <skipAttach>false</skipAttach> + </configuration> + </execution> + </executions> + </plugin> + + </plugins> + </build> <dependencies> <dependency> - <groupId>cglib</groupId> - <artifactId>cglib</artifactId> - <version>3.2.1</version> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.10</version> + <scope>test</scope> + </dependency> + <dependency> + <!-- required by twitter demo --> + <groupId>org.twitter4j</groupId> + <artifactId>twitter4j-core</artifactId> + <version>4.0.4</version> + </dependency> + <dependency> + <!-- required by twitter demo --> + <groupId>org.twitter4j</groupId> + <artifactId>twitter4j-stream</artifactId> + <version>4.0.4</version> + </dependency> + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-contrib</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-stream</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.hsqldb</groupId> + <artifactId>hsqldb</artifactId> + <version>2.3.1</version> + </dependency> + <dependency> + <groupId>com.h2database</groupId> + <artifactId>h2</artifactId> + <version>1.4.192</version> + <scope>test</scope> </dependency> <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> <version>2.9.1</version> </dependency> + <dependency> + <!--This dependency is needed for StreamingWordExtractTest--> + <groupId>org.codehaus.janino</groupId> + <artifactId>commons-compiler</artifactId> + <version>2.7.8</version> + <type>jar</type> + </dependency> + <dependency> + <!--This dependency is needed for StreamingWordExtractTest--> + <groupId>org.codehaus.janino</groupId> + <artifactId>janino</artifactId> + <version>2.7.8</version> + <scope>test</scope> + </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java new file mode 100644 index 0000000..671cc72 --- /dev/null +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.StreamFactory; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.util.KeyValPair; + +import static org.apache.apex.malhar.stream.api.Option.Options.name; + +/** + * Beam MinimalWordCount Example + */ +@ApplicationAnnotation(name = "MinimalWordCount") +public class MinimalWordCount implements StreamingApplication +{ + public static class Collector extends BaseOperator + { + static Map<String, Long> result; + private static boolean done = false; + + public static boolean isDone() + { + return done; + } + + @Override + public void setup(Context.OperatorContext context) + { + done = false; + result = new HashMap<>(); + } + + public final transient DefaultInputPort<KeyValPair<String, Long>> input = new DefaultInputPort<KeyValPair<String, Long>>() + { + @Override + public void process(KeyValPair<String, Long> tuple) + { + if (tuple.getKey().equals("bye")) { + done = true; + } + result.put(tuple.getKey(), tuple.getValue()); + } + }; + } + + /** + * Populate the dag using High-Level API. + * @param dag + * @param conf + */ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + Collector collector = new Collector(); + // Create a stream reading from a file line by line using StreamFactory. + StreamFactory.fromFolder("./src/test/resources/wordcount", name("textInput")) + // Use a flatmap transformation to extract words from the incoming stream of lines. + .flatMap(new Function.FlatMapFunction<String, String>() + { + @Override + public Iterable<String> f(String input) + { + return Arrays.asList(input.split("[^a-zA-Z']+")); + + } + }, name("ExtractWords")) + // Apply windowing to the stream for counting, in this case, the window option is global window. + .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1)) + // Count the appearances of every word. + .countByKey(new Function.ToKeyValue<String, String, Long>() + { + @Override + public Tuple<KeyValPair<String, Long>> f(String input) + { + return new Tuple.PlainTuple<KeyValPair<String, Long>>(new KeyValPair<String, Long>(input, 1L)); + } + }, name("countByKey")) + // Format the counting result to a readable format by unwrapping the tuples. + .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, KeyValPair<String, Long>>() + { + @Override + public KeyValPair<String, Long> f(Tuple.WindowedTuple<KeyValPair<String, Long>> input) + { + return input.getValue(); + } + }, name("FormatResults")) + // Print the result. + .print() + // Attach a collector to the stream to collect results. + .endWith(collector, collector.input, name("Collector")) + // populate the dag using the stream. + .populateDag(dag); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java new file mode 100644 index 0000000..6a6777e --- /dev/null +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import org.joda.time.Duration; +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.stream.api.ApexStream; +import org.apache.apex.malhar.stream.api.WindowedStream; +import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.StreamFactory; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Throwables; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.util.KeyValPair; + +import static org.apache.apex.malhar.stream.api.Option.Options.name; + +/** + * Beam WindowedWordCount Example. + */ +@ApplicationAnnotation(name = "WindowedWordCount") +public class WindowedWordCount implements StreamingApplication +{ + static final int WINDOW_SIZE = 1; // Default window duration in minutes + + /** + * A input operator that reads from and output a file line by line to downstream with a time gap between + * every two lines. + */ + public static class TextInput extends BaseOperator implements InputOperator + { + private static boolean done = false; + + public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>(); + + private transient BufferedReader reader; + + public static boolean isDone() + { + return done; + } + + @Override + public void setup(Context.OperatorContext context) + { + done = false; + initReader(); + } + + private void initReader() + { + try { + InputStream resourceStream = this.getClass().getResourceAsStream("/wordcount/word.txt"); + reader = new BufferedReader(new InputStreamReader(resourceStream)); + } catch (Exception ex) { + throw Throwables.propagate(ex); + } + } + + @Override + public void teardown() + { + IOUtils.closeQuietly(reader); + } + + @Override + public void emitTuples() + { + try { + String line = reader.readLine(); + if (line == null) { + done = true; + reader.close(); + Thread.sleep(1000); + } else { + this.output.emit(line); + } + Thread.sleep(50); + } catch (IOException ex) { + throw new RuntimeException(ex); + } catch (InterruptedException e) { + throw Throwables.propagate(e); + } + } + } + + public static class Collector extends BaseOperator + { + private static Map<KeyValPair<Long, String>, Long> result = new HashMap<>(); + + public static Map<KeyValPair<Long, String>, Long> getResult() + { + return result; + } + + public final transient DefaultInputPort<PojoEvent> input = new DefaultInputPort<PojoEvent>() + { + @Override + public void process(PojoEvent tuple) + { + result.put(new KeyValPair<Long, String>(tuple.getTimestamp(), tuple.getWord()), tuple.getCount()); + } + }; + } + + /** + * A Pojo Tuple class used for outputting result to JDBC. + */ + public static class PojoEvent + { + private String word; + private long count; + private long timestamp; + + @Override + public String toString() + { + return "PojoEvent (word=" + getWord() + ", count=" + getCount() + ", timestamp=" + getTimestamp() + ")"; + } + + public String getWord() + { + return word; + } + + public void setWord(String word) + { + this.word = word; + } + + public long getCount() + { + return count; + } + + public void setCount(long count) + { + this.count = count; + } + + public long getTimestamp() + { + return timestamp; + } + + public void setTimestamp(long timestamp) + { + this.timestamp = timestamp; + } + } + + /** + * A map function that wrap the input string with a random generated timestamp. + */ + public static class AddTimestampFn implements Function.MapFunction<String, Tuple.TimestampedTuple<String>> + { + private static final Duration RAND_RANGE = Duration.standardMinutes(10); + private final Long minTimestamp; + + AddTimestampFn() + { + this.minTimestamp = System.currentTimeMillis(); + } + + @Override + public Tuple.TimestampedTuple<String> f(String input) + { + // Generate a timestamp that falls somewhere in the past two hours. + long randMillis = (long)(Math.random() * RAND_RANGE.getMillis()); + long randomTimestamp = minTimestamp + randMillis; + + return new Tuple.TimestampedTuple<>(randomTimestamp, input); + } + } + + /** A MapFunction that converts a Word and Count into a PojoEvent. */ + public static class FormatAsTableRowFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, PojoEvent> + { + @Override + public PojoEvent f(Tuple.WindowedTuple<KeyValPair<String, Long>> input) + { + PojoEvent row = new PojoEvent(); + row.setTimestamp(input.getTimestamp()); + row.setCount(input.getValue().getValue()); + row.setWord(input.getValue().getKey()); + return row; + } + } + + /** + * Populate dag with High-Level API. + * @param dag + * @param conf + */ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + TextInput input = new TextInput(); + Collector collector = new Collector(); + + // Create stream from the TextInput operator. + ApexStream<Tuple.TimestampedTuple<String>> stream = StreamFactory.fromInput(input, input.output, name("input")) + + // Extract all the words from the input line of text. + .flatMap(new Function.FlatMapFunction<String, String>() + { + @Override + public Iterable<String> f(String input) + { + return Arrays.asList(input.split("[\\p{Punct}\\s]+")); + } + }, name("ExtractWords")) + + // Wrap the word with a randomly generated timestamp. + .map(new AddTimestampFn(), name("AddTimestampFn")); + + + // apply window and trigger option. + // TODO: change trigger option to atWaterMark when available. + WindowedStream<Tuple.TimestampedTuple<String>> windowedWords = stream + .window(new WindowOption.TimeWindows(Duration.standardMinutes(WINDOW_SIZE)), + new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1)); + + + WindowedStream<PojoEvent> wordCounts = + // Perform a countByKey transformation to count the appearance of each word in every time window. + windowedWords.countByKey(new Function.ToKeyValue<Tuple.TimestampedTuple<String>, String, Long>() + { + @Override + public Tuple<KeyValPair<String, Long>> f(Tuple.TimestampedTuple<String> input) + { + return new Tuple.TimestampedTuple<KeyValPair<String, Long>>(input.getTimestamp(), + new KeyValPair<String, Long>(input.getValue(), 1L)); + } + }, name("count words")) + + // Format the output and print out the result. + .map(new FormatAsTableRowFn(), name("FormatAsTableRowFn")).print(); + + wordCounts.endWith(collector, collector.input, name("Collector")).populateDag(dag); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java new file mode 100644 index 0000000..29c8cf9 --- /dev/null +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java @@ -0,0 +1,308 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.complete; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.stream.api.ApexStream; +import org.apache.apex.malhar.stream.api.CompositeStreamTransform; +import org.apache.apex.malhar.stream.api.WindowedStream; +import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.StreamFactory; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Throwables; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.util.KeyValPair; + +import static org.apache.apex.malhar.stream.api.Option.Options.name; + +/** + * An example that computes the most popular hash tags + * for every prefix, which can be used for auto-completion. + * This application is identical to TwitterAutoComplete, except it's + * reading from a file. This application is mainly for local testing + * purpose. + * + * <p>This will update the datastore every 10 seconds based on the last + * 30 minutes of data received. + */ +@ApplicationAnnotation(name = "AutoComplete") +public class AutoComplete implements StreamingApplication +{ + + /** + * A dummy Twitter input operator. It reads from a text file containing some tweets and output a line every + * half of a second. + */ + public static class TweetsInput extends BaseOperator implements InputOperator + { + private static boolean done = false; + public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>(); + + private transient BufferedReader reader; + + public static boolean isDone() + { + return done; + } + + @Override + public void setup(OperatorContext context) + { + done = false; + initReader(); + } + + private void initReader() + { + try { + InputStream resourceStream = this.getClass().getResourceAsStream("/sampletweets.txt"); + reader = new BufferedReader(new InputStreamReader(resourceStream)); + } catch (Exception ex) { + throw Throwables.propagate(ex); + } + } + + @Override + public void teardown() + { + IOUtils.closeQuietly(reader); + } + + @Override + public void emitTuples() + { + try { + String line = reader.readLine(); + if (line == null) { + done = true; + reader.close(); + Thread.sleep(1000); + } else { + this.output.emit(line); + } + Thread.sleep(50); + } catch (IOException ex) { + throw new RuntimeException(ex); + } catch (InterruptedException e) { + // Ignore it. + } + } + } + + public static class Collector extends BaseOperator + { + private static Map<String, List<CompletionCandidate>> result = new HashMap<>(); + + public static Map<String, List<CompletionCandidate>> getResult() + { + return result; + } + + public final transient DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> input = new DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>() + { + @Override + public void process(Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>> tuple) + { + result.put(tuple.getValue().getKey(), tuple.getValue().getValue()); + } + }; + } + + /** + * FlapMap Function to extract all hashtags from a text form tweet. + */ + private static class ExtractHashtags implements Function.FlatMapFunction<String, String> + { + + @Override + public Iterable<String> f(String input) + { + List<String> result = new LinkedList<>(); + Matcher m = Pattern.compile("#\\S+").matcher(input); + while (m.find()) { + result.add(m.group().substring(1)); + } + return result; + } + } + + /** + * Lower latency, but more expensive. + */ + private static class ComputeTopFlat + extends CompositeStreamTransform<WindowedStream<CompletionCandidate>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>> + { + private final int candidatesPerPrefix; + private final int minPrefix; + + public ComputeTopFlat(int candidatesPerPrefix, int minPrefix) + { + this.candidatesPerPrefix = candidatesPerPrefix; + this.minPrefix = minPrefix; + } + + @Override + public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose( + WindowedStream<CompletionCandidate> input) + { + return input + .<KeyValPair<String, CompletionCandidate>, WindowedStream<KeyValPair<String, CompletionCandidate>>>flatMap(new AllPrefixes(minPrefix)) + .accumulateByKey(new TopNByKey(), new Function.ToKeyValue<KeyValPair<String, CompletionCandidate>, String, + CompletionCandidate>() + { + @Override + public Tuple<KeyValPair<String, CompletionCandidate>> f(KeyValPair<String, CompletionCandidate> tuple) + { + // TODO: Should be removed after Auto-wrapping is supported. + return new Tuple.WindowedTuple<>(Window.GLOBAL_WINDOW, tuple); + } + }); + } + } + + /** + * FlapMap Function to extract all prefixes of the hashtag in the input CompletionCandidate, and output + * KeyValPairs of the prefix and the CompletionCandidate + */ + private static class AllPrefixes implements Function.FlatMapFunction<CompletionCandidate, KeyValPair<String, CompletionCandidate>> + { + private final int minPrefix; + private final int maxPrefix; + + public AllPrefixes() + { + this(0, Integer.MAX_VALUE); + } + + public AllPrefixes(int minPrefix) + { + this(minPrefix, Integer.MAX_VALUE); + } + + public AllPrefixes(int minPrefix, int maxPrefix) + { + this.minPrefix = minPrefix; + this.maxPrefix = maxPrefix; + } + + @Override + public Iterable<KeyValPair<String, CompletionCandidate>> f(CompletionCandidate input) + { + List<KeyValPair<String, CompletionCandidate>> result = new LinkedList<>(); + String word = input.getValue(); + for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) { + + result.add(new KeyValPair<>(input.getValue().substring(0, i).toLowerCase(), input)); + } + return result; + } + } + + /** + * A Composite stream transform that takes as input a list of tokens and returns + * the most common tokens per prefix. + */ + public static class ComputeTopCompletions + extends CompositeStreamTransform<WindowedStream<String>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>> + { + private final int candidatesPerPrefix; + private final boolean recursive; + + protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive) + { + this.candidatesPerPrefix = candidatesPerPrefix; + this.recursive = recursive; + } + + public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive) + { + return new ComputeTopCompletions(candidatesPerPrefix, recursive); + } + + @Override + @SuppressWarnings("unchecked") + public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(WindowedStream<String> inputStream) + { + ApexStream<CompletionCandidate> candidates = inputStream + .countByKey(new Function.ToKeyValue<String, String, Long>() + { + @Override + public Tuple<KeyValPair<String, Long>> f(String input) + { + return new Tuple.PlainTuple<>(new KeyValPair<>(input, 1L)); + } + }, name("countByKey")).map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String,Long>>, CompletionCandidate>() + { + @Override + public CompletionCandidate f(Tuple.WindowedTuple<KeyValPair<String, Long>> input) + { + return new CompletionCandidate(input.getValue().getKey(), input.getValue().getValue()); + } + }, name("ToCompletionCandidate")); + + return candidates.addCompositeStreams(new ComputeTopFlat(10, 1)); + + } + } + + /** + * Populate the dag with High-Level API. + * @param dag + * @param conf + */ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + TweetsInput input = new TweetsInput(); + Collector collector = new Collector(); + + WindowOption windowOption = new WindowOption.GlobalWindow(); + + ApexStream<String> tags = StreamFactory.fromInput(input, input.output, name("tweetSampler")) + .flatMap(new ExtractHashtags()); + + tags.window(windowOption, new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1)) + .addCompositeStreams(ComputeTopCompletions.top(10, true)).endWith(collector, collector.input, name("collector")) + .populateDag(dag); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java new file mode 100644 index 0000000..8a7113e --- /dev/null +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.complete; + +/** + * Class used to store tag-count pairs in Auto Complete Demo. + */ +public class CompletionCandidate implements Comparable<CompletionCandidate> +{ + private long count; + private String value; + + public CompletionCandidate(String value, long count) + { + this.value = value; + this.count = count; + } + + public long getCount() + { + return count; + } + + public String getValue() + { + return value; + } + + // Empty constructor required for Kryo. + public CompletionCandidate() + { + + } + + @Override + public int compareTo(CompletionCandidate o) + { + if (this.count < o.count) { + return -1; + } else if (this.count == o.count) { + return this.value.compareTo(o.value); + } else { + return 1; + } + } + + @Override + public boolean equals(Object other) + { + if (other instanceof CompletionCandidate) { + CompletionCandidate that = (CompletionCandidate)other; + return this.count == that.count && this.value.equals(that.value); + } else { + return false; + } + } + + @Override + public int hashCode() + { + return Long.valueOf(count).hashCode() ^ value.hashCode(); + } + + @Override + public String toString() + { + return "CompletionCandidate[" + value + ", " + count + "]"; + } +} + + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java new file mode 100644 index 0000000..2a4c003 --- /dev/null +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.complete; + +/** + * Tuple Class for JdbcOutput of StreamingWordExtract. + */ +public class PojoEvent extends Object +{ + private String stringValue; + + @Override + public String toString() + { + return "PojoEvent [stringValue=" + getStringValue() + "]"; + } + + public void setStringValue(String newString) + { + this.stringValue = newString; + } + + public String getStringValue() + { + return this.stringValue; + } +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java new file mode 100644 index 0000000..2ffdc82 --- /dev/null +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.complete; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.apex.malhar.stream.api.ApexStream; +import org.apache.apex.malhar.stream.api.Option; +import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.StreamFactory; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.db.jdbc.JdbcFieldInfo; +import com.datatorrent.lib.db.jdbc.JdbcPOJOInsertOutputOperator; +import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore; + +import static java.sql.Types.VARCHAR; + +/** + * Beam StreamingWordExtract Example. + */ +@ApplicationAnnotation(name = "StreamingWordExtract") +public class StreamingWordExtract implements StreamingApplication +{ + private static int wordCount = 0; // A counter to count number of words have been extracted. + private static int entriesMapped = 0; // A counter to count number of entries have been mapped. + + public int getWordCount() + { + return wordCount; + } + + public int getEntriesMapped() + { + return entriesMapped; + } + + /** + * A MapFunction that tokenizes lines of text into individual words. + */ + public static class ExtractWords implements Function.FlatMapFunction<String, String> + { + @Override + public Iterable<String> f(String input) + { + List<String> result = new ArrayList<>(Arrays.asList(input.split("[^a-zA-Z0-9']+"))); + wordCount += result.size(); + return result; + } + } + + + /** + * A MapFunction that uppercases a word. + */ + public static class Uppercase implements Function.MapFunction<String, String> + { + @Override + public String f(String input) + { + return input.toUpperCase(); + } + } + + + /** + * A filter function to filter out empty strings. + */ + public static class EmptyStringFilter implements Function.FilterFunction<String> + { + @Override + public boolean f(String input) + { + return !input.isEmpty(); + } + } + + + /** + * A map function to map the result string to a pojo entry. + */ + public static class PojoMapper implements Function.MapFunction<String, Object> + { + + @Override + public Object f(String input) + { + PojoEvent pojo = new PojoEvent(); + pojo.setStringValue(input); + entriesMapped++; + return pojo; + } + } + + /** + * Add field infos to the {@link JdbcPOJOInsertOutputOperator}. + */ + private static List<JdbcFieldInfo> addFieldInfos() + { + List<JdbcFieldInfo> fieldInfos = new ArrayList<>(); + fieldInfos.add(new JdbcFieldInfo("STRINGVALUE", "stringValue", JdbcFieldInfo.SupportType.STRING, VARCHAR)); + return fieldInfos; + } + + /** + * Populate dag with High-Level API. + * @param dag + * @param conf + */ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + JdbcPOJOInsertOutputOperator jdbcOutput = new JdbcPOJOInsertOutputOperator(); + jdbcOutput.setFieldInfos(addFieldInfos()); + JdbcTransactionalStore outputStore = new JdbcTransactionalStore(); + jdbcOutput.setStore(outputStore); + jdbcOutput.setTablename("TestTable"); + + // Create a stream reading from a folder. + ApexStream<String> stream = StreamFactory.fromFolder("./src/test/resources/data"); + + // Extract all the words from the input line of text. + stream.flatMap(new ExtractWords()) + + // Filter out the empty strings. + .filter(new EmptyStringFilter()) + + // Change every word to uppercase. + .map(new Uppercase()) + + // Map the resulted word to a Pojo entry. + .map(new PojoMapper()) + + // Output the entries to JdbcOutput and insert them into a table. + .endWith(jdbcOutput, jdbcOutput.input, Option.Options.name("jdbcOutput")); + + stream.populateDag(dag); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopNByKey.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopNByKey.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopNByKey.java new file mode 100644 index 0000000..a9e7744 --- /dev/null +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopNByKey.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.complete; + +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.apex.malhar.lib.window.Accumulation; + +/** + * Specialized TopNByKey accumulation for AutoComplete Demo. + */ +public class TopNByKey implements + Accumulation<CompletionCandidate, Map<String, Long>, List<CompletionCandidate>> +{ + int n = 10; + + Comparator comparator; + + public void setN(int n) + { + this.n = n; + } + + public void setComparator(Comparator comparator) + { + this.comparator = comparator; + } + + @Override + public Map<String, Long> defaultAccumulatedValue() + { + return new HashMap<>(); + } + + /** + * Accumulate the input. Update the entry in the Accumulation Map if the key of the input is existed, create a + * new entry otherwise. + * @param accumulatedValue + * @param input + * @return + */ + @Override + public Map<String, Long> accumulate(Map<String, Long> accumulatedValue, CompletionCandidate input) + { + accumulatedValue.put(input.getValue(), input.getCount()); + return accumulatedValue; + } + + /** + * Merge two Maps together. For every key, keep the larger value in the resulted Map. + * @param accumulatedValue1 + * @param accumulatedValue2 + * @return + */ + @Override + public Map<String, Long> merge(Map<String, Long> accumulatedValue1, Map<String, Long> accumulatedValue2) + { + for (Map.Entry<String, Long> entry : accumulatedValue2.entrySet()) { + if (accumulatedValue1.containsKey(entry.getKey()) && accumulatedValue1.get(entry.getKey()) > entry.getValue()) { + continue; + } + accumulatedValue1.put(entry.getKey(), entry.getValue()); + } + return accumulatedValue1; + } + + /** + * Loop through the Accumulation Map to get the top n entries based on their values, return a list containing + * those entries. + * @param accumulatedValue + * @return + */ + @Override + public List<CompletionCandidate> getOutput(Map<String, Long> accumulatedValue) + { + LinkedList<CompletionCandidate> result = new LinkedList<>(); + for (Map.Entry<String, Long> entry : accumulatedValue.entrySet()) { + int k = 0; + for (CompletionCandidate inMemory : result) { + if (entry.getValue() > inMemory.getCount()) { + break; + } + k++; + } + result.add(k, new CompletionCandidate(entry.getKey(), entry.getValue())); + if (result.size() > n) { + result.remove(result.get(result.size() - 1)); + } + } + return result; + } + + @Override + public List<CompletionCandidate> getRetraction(List<CompletionCandidate> value) + { + return new LinkedList<>(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java new file mode 100644 index 0000000..de4e590 --- /dev/null +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java @@ -0,0 +1,340 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.complete; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Random; +import javax.annotation.Nullable; + +import org.joda.time.Duration; + +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.lib.window.impl.accumulation.TopN; +import org.apache.apex.malhar.stream.api.ApexStream; +import org.apache.apex.malhar.stream.api.CompositeStreamTransform; +import org.apache.apex.malhar.stream.api.WindowedStream; +import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.StreamFactory; +import org.apache.hadoop.conf.Configuration; + + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.util.KeyValPair; + +import static org.apache.apex.malhar.stream.api.Option.Options.name; + +/** + * Beam's TopWikipediaSessions Example. + */ +@ApplicationAnnotation(name = "TopWikipediaSessions") +public class TopWikipediaSessions implements StreamingApplication +{ + /** + * A generator that outputs a stream of combinations of some users and some randomly generated edit time. + */ + public static class SessionGen extends BaseOperator implements InputOperator + { + private String[] names = new String[]{"user1", "user2", "user3", "user4"}; + public transient DefaultOutputPort<KeyValPair<String, Long>> output = new DefaultOutputPort<>(); + + private static final Duration RAND_RANGE = Duration.standardDays(365); + private Long minTimestamp; + private long sleepTime; + private static int tupleCount = 0; + + public static int getTupleCount() + { + return tupleCount; + } + + private String randomName(String[] names) + { + int index = new Random().nextInt(names.length); + return names[index]; + } + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + tupleCount = 0; + minTimestamp = System.currentTimeMillis(); + sleepTime = context.getValue(Context.OperatorContext.SPIN_MILLIS); + } + + @Override + public void emitTuples() + { + long randMillis = (long)(Math.random() * RAND_RANGE.getMillis()); + long randomTimestamp = minTimestamp + randMillis; + output.emit(new KeyValPair<String, Long>(randomName(names), randomTimestamp)); + tupleCount++; + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + // Ignore it. + } + } + } + + public static class Collector extends BaseOperator + { + private final int resultSize = 5; + private static List<List<TempWrapper>> result = new ArrayList<>(); + + public static List<List<TempWrapper>> getResult() + { + return result; + } + + public final transient DefaultInputPort<Tuple.WindowedTuple<List<TempWrapper>>> input = new DefaultInputPort<Tuple.WindowedTuple<List<TempWrapper>>>() + { + @Override + public void process(Tuple.WindowedTuple<List<TempWrapper>> tuple) + { + if (result.size() == resultSize) { + result.remove(0); + } + result.add(tuple.getValue()); + } + }; + } + + + /** + * Convert the upstream (user, time) combination to a timestamped tuple of user. + */ + static class ExtractUserAndTimestamp implements Function.MapFunction<KeyValPair<String, Long>, Tuple.TimestampedTuple<String>> + { + @Override + public Tuple.TimestampedTuple<String> f(KeyValPair<String, Long> input) + { + long timestamp = input.getValue(); + String userName = input.getKey(); + + // Sets the implicit timestamp field to be used in windowing. + return new Tuple.TimestampedTuple<>(timestamp, userName); + + } + } + + /** + * Computes the number of edits in each user session. A session is defined as + * a string of edits where each is separated from the next by less than an hour. + */ + static class ComputeSessions + extends CompositeStreamTransform<ApexStream<Tuple.TimestampedTuple<String>>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, Long>>>> + { + @Override + public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, Long>>> compose(ApexStream<Tuple.TimestampedTuple<String>> inputStream) + { + return inputStream + + // Chuck the stream into session windows. + .window(new WindowOption.SessionWindows(Duration.standardHours(1)), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1)) + + // Count the number of edits for a user within one session. + .countByKey(new Function.ToKeyValue<Tuple.TimestampedTuple<String>, String, Long>() + { + @Override + public Tuple.TimestampedTuple<KeyValPair<String, Long>> f(Tuple.TimestampedTuple<String> input) + { + return new Tuple.TimestampedTuple<KeyValPair<String, Long>>(input.getTimestamp(), new KeyValPair<String, Long>(input.getValue(), 1L)); + } + }, name("ComputeSessions")); + } + } + + /** + * A comparator class used for comparing two TempWrapper objects. + */ + public static class Comp implements Comparator<TempWrapper> + { + @Override + public int compare(TempWrapper o1, TempWrapper o2) + { + return Long.compare(o1.getValue().getValue(), o2.getValue().getValue()); + } + } + + /** + * A function to extract timestamp from a TempWrapper object. + */ + // TODO: Need to revisit and change back to using TimestampedTuple. + public static class TimestampExtractor implements com.google.common.base.Function<TempWrapper, Long> + { + @Override + public Long apply(@Nullable TempWrapper input) + { + return input.getTimestamp(); + } + } + + /** + * A temporary wrapper to wrap a KeyValPair and a timestamp together to represent a timestamped tuple, the reason + * for this is that we cannot resolve a type conflict when calling accumulate(). After the issue resolved, we can + * remove this class. + */ + public static class TempWrapper + { + private KeyValPair<String, Long> value; + private Long timestamp; + + public TempWrapper() + { + + } + + public TempWrapper(KeyValPair<String, Long> value, Long timestamp) + { + this.value = value; + this.timestamp = timestamp; + } + + @Override + public String toString() + { + return this.value + " - " + this.timestamp; + } + + public Long getTimestamp() + { + return timestamp; + } + + public void setTimestamp(Long timestamp) + { + this.timestamp = timestamp; + } + + public KeyValPair<String, Long> getValue() + { + return value; + } + + public void setValue(KeyValPair<String, Long> value) + { + this.value = value; + } + } + + /** + * Computes the longest session ending in each month, in this case we use 30 days to represent every month. + */ + private static class TopPerMonth + extends CompositeStreamTransform<ApexStream<Tuple.WindowedTuple<KeyValPair<String, Long>>>, WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>>> + { + + @Override + public WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>> compose(ApexStream<Tuple.WindowedTuple<KeyValPair<String, Long>>> inputStream) + { + TopN<TempWrapper> topN = new TopN<>(); + topN.setN(10); + topN.setComparator(new Comp()); + + return inputStream + + // Map the input WindowedTuple to a TempWrapper object. + .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, TempWrapper>() + { + @Override + public TempWrapper f(Tuple.WindowedTuple<KeyValPair<String, Long>> input) + { + return new TempWrapper(input.getValue(), input.getWindows().get(0).getBeginTimestamp()); + } + }, name("TempWrapper")) + + // Apply window and trigger option again, this time chuck the stream into fixed time windows. + .window(new WindowOption.TimeWindows(Duration.standardDays(30)), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(5))) + + // Compute the top 10 user-sessions with most number of edits. + .accumulate(topN, name("TopN")).with("timestampExtractor", new TimestampExtractor()); + } + } + + /** + * A map function that combine the user and his/her edit session together to a string and use that string as a key + * with number of edits in that session as value to create a new key value pair to send to downstream. + */ + static class SessionsToStringsDoFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, Tuple.WindowedTuple<KeyValPair<String, Long>>> + { + @Override + public Tuple.WindowedTuple<KeyValPair<String, Long>> f(Tuple.WindowedTuple<KeyValPair<String, Long>> input) + { + return new Tuple.WindowedTuple<KeyValPair<String, Long>>(input.getWindows().get(0), new KeyValPair<String, Long>( + input.getValue().getKey() + " : " + input.getWindows().get(0).getBeginTimestamp() + " : " + input.getWindows().get(0).getDurationMillis(), + input.getValue().getValue())); + } + } + + /** + * A flapmap function that turns the result into readable format. + */ + static class FormatOutputDoFn implements Function.FlatMapFunction<Tuple.WindowedTuple<List<TempWrapper>>, String> + { + @Override + public Iterable<String> f(Tuple.WindowedTuple<List<TempWrapper>> input) + { + ArrayList<String> result = new ArrayList<>(); + for (TempWrapper item : input.getValue()) { + String session = item.getValue().getKey(); + long count = item.getValue().getValue(); + result.add(session + " + " + count + " : " + input.getWindows().get(0).getBeginTimestamp()); + } + return result; + } + } + + /** + * A composite transform that compute the top wikipedia sessions. + */ + public static class ComputeTopSessions extends CompositeStreamTransform<ApexStream<KeyValPair<String, Long>>, WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>>> + { + @Override + public WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>> compose(ApexStream<KeyValPair<String, Long>> inputStream) + { + return inputStream + .map(new ExtractUserAndTimestamp(), name("ExtractUserAndTimestamp")) + .addCompositeStreams(new ComputeSessions()) + .map(new SessionsToStringsDoFn(), name("SessionsToStringsDoFn")) + .addCompositeStreams(new TopPerMonth()); + } + } + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + SessionGen sg = new SessionGen(); + Collector collector = new Collector(); + StreamFactory.fromInput(sg, sg.output, name("sessionGen")) + .addCompositeStreams(new ComputeTopSessions()) + .endWith(collector, collector.input, name("collector")).populateDag(dag); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java new file mode 100644 index 0000000..2cc04d1 --- /dev/null +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java @@ -0,0 +1,521 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.complete; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.List; +import java.util.Map; + +import javax.annotation.Nullable; + +import org.joda.time.Duration; + +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.lib.window.impl.accumulation.Group; +import org.apache.apex.malhar.stream.api.ApexStream; +import org.apache.apex.malhar.stream.api.CompositeStreamTransform; +import org.apache.apex.malhar.stream.api.WindowedStream; +import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.StreamFactory; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.Lists; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.util.KeyValPair; + +import static org.apache.apex.malhar.stream.api.Option.Options.name; + +/** + * Beam's TrafficRoutes example. + */ +@ApplicationAnnotation(name = "TrafficRoutes") +public class TrafficRoutes implements StreamingApplication +{ + static Map<String, String> sdStations = buildStationInfo(); + static final int WINDOW_DURATION = 3; // Default sliding window duration in minutes + static final int WINDOW_SLIDE_EVERY = 1; // Default window 'slide every' setting in minutes + + /** + * This class holds information about a station reading's average speed. + */ + public static class StationSpeed implements Comparable<StationSpeed> + { + @Nullable + String stationId; + @Nullable + Double avgSpeed; + @Nullable + Long timestamp; + + public StationSpeed() {} + + public StationSpeed(String stationId, Double avgSpeed, Long timestamp) + { + this.stationId = stationId; + this.avgSpeed = avgSpeed; + this.timestamp = timestamp; + } + + public void setAvgSpeed(@Nullable Double avgSpeed) + { + this.avgSpeed = avgSpeed; + } + + public void setStationId(@Nullable String stationId) + { + this.stationId = stationId; + } + + public void setTimestamp(@Nullable Long timestamp) + { + this.timestamp = timestamp; + } + + @Nullable + public Long getTimestamp() + { + return timestamp; + } + + public String getStationId() + { + return this.stationId; + } + + public Double getAvgSpeed() + { + return this.avgSpeed; + } + + @Override + public int compareTo(StationSpeed other) + { + return Long.compare(this.timestamp, other.timestamp); + } + } + + /** + * This class holds information about a route's speed/slowdown. + */ + static class RouteInfo + { + @Nullable + String route; + @Nullable + Double avgSpeed; + @Nullable + Boolean slowdownEvent; + + public RouteInfo() + { + + } + + public RouteInfo(String route, Double avgSpeed, Boolean slowdownEvent) + { + this.route = route; + this.avgSpeed = avgSpeed; + this.slowdownEvent = slowdownEvent; + } + + public String getRoute() + { + return this.route; + } + + public Double getAvgSpeed() + { + return this.avgSpeed; + } + + public Boolean getSlowdownEvent() + { + return this.slowdownEvent; + } + } + + /** + * Extract the timestamp field from the input string, and wrap the input string in a {@link Tuple.TimestampedTuple} + * with the extracted timestamp. + */ + static class ExtractTimestamps implements Function.MapFunction<String, Tuple.TimestampedTuple<String>> + { + + @Override + public Tuple.TimestampedTuple<String> f(String input) + { + String[] items = input.split(","); + String timestamp = tryParseTimestamp(items); + + return new Tuple.TimestampedTuple<>(Long.parseLong(timestamp), input); + } + } + + /** + * Filter out readings for the stations along predefined 'routes', and output + * (station, speed info) keyed on route. + */ + static class ExtractStationSpeedFn implements Function.FlatMapFunction<Tuple.TimestampedTuple<String>, KeyValPair<String, StationSpeed>> + { + + @Override + public Iterable<KeyValPair<String, StationSpeed>> f(Tuple.TimestampedTuple<String> input) + { + + ArrayList<KeyValPair<String, StationSpeed>> result = new ArrayList<>(); + String[] items = input.getValue().split(","); + String stationType = tryParseStationType(items); + // For this analysis, use only 'main line' station types + if (stationType != null && stationType.equals("ML")) { + Double avgSpeed = tryParseAvgSpeed(items); + String stationId = tryParseStationId(items); + // For this simple example, filter out everything but some hardwired routes. + if (avgSpeed != null && stationId != null && sdStations.containsKey(stationId)) { + StationSpeed stationSpeed = + new StationSpeed(stationId, avgSpeed, input.getTimestamp()); + // The tuple key is the 'route' name stored in the 'sdStations' hash. + KeyValPair<String, StationSpeed> outputValue = new KeyValPair<>(sdStations.get(stationId), stationSpeed); + result.add(outputValue); + } + } + return result; + } + } + + /** + * For a given route, track average speed for the window. Calculate whether + * traffic is currently slowing down, via a predefined threshold. If a supermajority of + * speeds in this sliding window are less than the previous reading we call this a 'slowdown'. + * Note: these calculations are for example purposes only, and are unrealistic and oversimplified. + */ + static class GatherStats + implements Function.FlatMapFunction<Tuple.WindowedTuple<KeyValPair<String, List<StationSpeed>>>, Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> + { + @Override + public Iterable<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> f(Tuple.WindowedTuple<KeyValPair<String, List<StationSpeed>>> input) + { + ArrayList<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> result = new ArrayList<>(); + String route = input.getValue().getKey(); + double speedSum = 0.0; + int speedCount = 0; + int speedups = 0; + int slowdowns = 0; + List<StationSpeed> infoList = Lists.newArrayList(input.getValue().getValue()); + // StationSpeeds sort by embedded timestamp. + Collections.sort(infoList); + Map<String, Double> prevSpeeds = new HashMap<>(); + // For all stations in the route, sum (non-null) speeds. Keep a count of the non-null speeds. + for (StationSpeed item : infoList) { + Double speed = item.getAvgSpeed(); + if (speed != null) { + speedSum += speed; + speedCount++; + Double lastSpeed = prevSpeeds.get(item.getStationId()); + if (lastSpeed != null) { + if (lastSpeed < speed) { + speedups += 1; + } else { + slowdowns += 1; + } + } + prevSpeeds.put(item.getStationId(), speed); + } + } + if (speedCount == 0) { + // No average to compute. + return result; + } + double speedAvg = speedSum / speedCount; + boolean slowdownEvent = slowdowns >= 2 * speedups; + RouteInfo routeInfo = new RouteInfo(route, speedAvg, slowdownEvent); + result.add(new Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>(input.getTimestamp(), new KeyValPair<String, RouteInfo>(route, routeInfo))); + return result; + } + } + + /** + * Output Pojo class for outputting result to JDBC. + */ + static class OutputPojo + { + private Double avgSpeed; + private Boolean slowdownEvent; + private String key; + private Long timestamp; + + public OutputPojo() + { + } + + public OutputPojo(Double avgSpeed, Boolean slowdownEvent, String key, Long timestamp) + { + this.avgSpeed = avgSpeed; + this.slowdownEvent = slowdownEvent; + this.key = key; + this.timestamp = timestamp; + } + + @Override + public String toString() + { + return key + " + " + avgSpeed + " + " + slowdownEvent + " + " + timestamp; + } + + public void setTimestamp(Long timestamp) + { + this.timestamp = timestamp; + } + + public Long getTimestamp() + { + return timestamp; + } + + public void setAvgSpeed(Double avgSpeed) + { + this.avgSpeed = avgSpeed; + } + + public Double getAvgSpeed() + { + return avgSpeed; + } + + public void setKey(String key) + { + this.key = key; + } + + public String getKey() + { + return key; + } + + public void setSlowdownEvent(Boolean slowdownEvent) + { + this.slowdownEvent = slowdownEvent; + } + + public Boolean getSlowdownEvent() + { + return slowdownEvent; + } + + } + + public static class Collector extends BaseOperator + { + private static Map<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> result = new HashMap<>(); + + public static Map<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> getResult() + { + return result; + } + + public final transient DefaultInputPort<OutputPojo> input = new DefaultInputPort<OutputPojo>() + { + @Override + public void process(OutputPojo tuple) + { + result.put(new KeyValPair<Long, String>(tuple.getTimestamp(), tuple.getKey()), new KeyValPair<Double, Boolean>(tuple.getAvgSpeed(), tuple.getSlowdownEvent())); + } + }; + } + + /** + * Format the results of the slowdown calculations to a OutputPojo. + */ + static class FormatStatsFn implements Function.MapFunction<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>, OutputPojo> + { + @Override + public OutputPojo f(Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>> input) + { + RouteInfo routeInfo = input.getValue().getValue(); + OutputPojo row = new OutputPojo(routeInfo.getAvgSpeed(), routeInfo.getSlowdownEvent(), input.getValue().getKey(), input.getTimestamp()); + return row; + } + } + + + /** + * This composite transformation extracts speed info from traffic station readings. + * It groups the readings by 'route' and analyzes traffic slowdown for that route. + * Lastly, it formats the results for JDBC. + */ + static class TrackSpeed extends + CompositeStreamTransform<WindowedStream<KeyValPair<String, StationSpeed>>, WindowedStream<OutputPojo>> + { + @Override + public WindowedStream<OutputPojo> compose(WindowedStream<KeyValPair<String, StationSpeed>> inputStream) + { + // Apply a GroupByKey transform to collect a list of all station + // readings for a given route. + WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<StationSpeed>>>> timeGroup = + inputStream + .accumulateByKey(new Group<StationSpeed>(), new Function.ToKeyValue<KeyValPair<String, StationSpeed>, String, StationSpeed>() + { + @Override + public Tuple<KeyValPair<String, StationSpeed>> f(KeyValPair<String, StationSpeed> input) + { + return new Tuple.TimestampedTuple<>(input.getValue().getTimestamp(), input); + } + }, name("GroupByKey")); + + // Analyze 'slowdown' over the route readings. + WindowedStream<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> stats = timeGroup + .flatMap(new GatherStats(), name("GatherStats")); + + // Format the results for writing to JDBC table. + WindowedStream<OutputPojo> results = stats.map(new FormatStatsFn(), name("FormatStatsFn")); + + return results; + } + } + + + private static Double tryParseAvgSpeed(String[] inputItems) + { + try { + return Double.parseDouble(tryParseString(inputItems, 3)); + } catch (NumberFormatException e) { + return null; + } catch (NullPointerException e) { + return null; + } + } + + private static String tryParseStationType(String[] inputItems) + { + return tryParseString(inputItems, 2); + } + + private static String tryParseStationId(String[] inputItems) + { + return tryParseString(inputItems, 1); + } + + private static String tryParseTimestamp(String[] inputItems) + { + return tryParseString(inputItems, 0); + } + + private static String tryParseString(String[] inputItems, int index) + { + return inputItems.length >= index ? inputItems[index] : null; + } + + /** + * Define some small hard-wired San Diego 'routes' to track based on sensor station ID. + */ + private static Map<String, String> buildStationInfo() + { + Map<String, String> stations = new Hashtable<String, String>(); + stations.put("1108413", "SDRoute1"); // from freeway 805 S + stations.put("1108699", "SDRoute2"); // from freeway 78 E + stations.put("1108702", "SDRoute2"); + return stations; + } + + /** + * A dummy generator to generate some traffic information. + */ + public static class InfoGen extends BaseOperator implements InputOperator + { + public transient DefaultOutputPort<String> output = new DefaultOutputPort<>(); + + private String[] stationTypes = new String[]{"ML", "BL", "GL"}; + private int[] stationIDs = new int[]{1108413, 1108699, 1108702}; + private double ave = 55.0; + private long timestamp; + private static final Duration RAND_RANGE = Duration.standardMinutes(10); + private static int tupleCount = 0; + + public static int getTupleCount() + { + return tupleCount; + } + + @Override + public void setup(Context.OperatorContext context) + { + tupleCount = 0; + timestamp = System.currentTimeMillis(); + } + + @Override + public void emitTuples() + { + for (String stationType : stationTypes) { + for (int stationID : stationIDs) { + double speed = Math.random() * 20 + ave; + long time = (long)(Math.random() * RAND_RANGE.getMillis()) + timestamp; + try { + output.emit(time + "," + stationID + "," + stationType + "," + speed); + tupleCount++; + + Thread.sleep(50); + } catch (Exception e) { + // Ignore it + } + } + } + } + } + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + InfoGen infoGen = new InfoGen(); + Collector collector = new Collector(); + + // Create a stream from the input operator. + ApexStream<Tuple.TimestampedTuple<String>> stream = StreamFactory.fromInput(infoGen, infoGen.output, name("infoGen")) + + // Extract the timestamp from the input and wrap it into a TimestampedTuple. + .map(new ExtractTimestamps(), name("ExtractTimestamps")); + + stream + // Extract the average speed of a station. + .flatMap(new ExtractStationSpeedFn(), name("ExtractStationSpeedFn")) + + // Apply window and trigger option. + .window(new WindowOption.SlidingTimeWindows(Duration.standardMinutes(WINDOW_DURATION), Duration.standardMinutes(WINDOW_SLIDE_EVERY)), new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(5000)).accumulatingFiredPanes()) + + // Apply TrackSpeed composite transformation to compute the route information. + .addCompositeStreams(new TrackSpeed()) + + // print the result to console. + .print() + .endWith(collector, collector.input, name("Collector")) + .populateDag(dag); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java new file mode 100644 index 0000000..ecad622 --- /dev/null +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java @@ -0,0 +1,251 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.complete; + +import java.nio.charset.Charset; +import java.nio.charset.CharsetEncoder; + +import java.util.LinkedList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.joda.time.Duration; + +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.apex.malhar.lib.window.WindowOption; + +import org.apache.apex.malhar.stream.api.ApexStream; +import org.apache.apex.malhar.stream.api.CompositeStreamTransform; +import org.apache.apex.malhar.stream.api.WindowedStream; +import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.StreamFactory; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.contrib.twitter.TwitterSampleInput; +import com.datatorrent.lib.util.KeyValPair; + +import static org.apache.apex.malhar.stream.api.Option.Options.name; + +/** + * Auto Complete Hashtag Demo with real time twitter input. In order to run this application, you need to create an app + * at https://apps.twitter.com, then generate your consumer and access keys and tokens, and enter those information + * accordingly in /resources/META-INF/properties.xml. + * + * The authentication requires following 4 information. + * Your application consumer key, + * Your application consumer secret, + * Your twitter access token, and + * Your twitter access token secret. + */ +@ApplicationAnnotation(name = "TwitterAutoComplete") +public class TwitterAutoComplete implements StreamingApplication +{ + /** + * Check whether every character in a string is ASCII encoding. + */ + public static class StringUtils + { + static CharsetEncoder encoder = Charset.forName("US-ASCII").newEncoder(); + + public static boolean isAscii(String v) + { + return encoder.canEncode(v); + } + } + + /** + * FlapMap Function to extract all hashtags from a text form tweet. + */ + private static class ExtractHashtags implements Function.FlatMapFunction<String, String> + { + + @Override + public Iterable<String> f(String input) + { + List<String> result = new LinkedList<>(); + Matcher m = Pattern.compile("#\\S+").matcher(input); + while (m.find()) { + result.add(m.group().substring(1)); + } + return result; + } + } + + /** + * Lower latency, but more expensive. + */ + private static class ComputeTopFlat + extends CompositeStreamTransform<WindowedStream<CompletionCandidate>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>> + { + private final int candidatesPerPrefix; + private final int minPrefix; + + public ComputeTopFlat(int candidatesPerPrefix, int minPrefix) + { + this.candidatesPerPrefix = candidatesPerPrefix; + this.minPrefix = minPrefix; + } + + @Override + public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose( + WindowedStream<CompletionCandidate> input) + { + TopNByKey topNByKey = new TopNByKey(); + topNByKey.setN(candidatesPerPrefix); + return input + .<KeyValPair<String, CompletionCandidate>, WindowedStream<KeyValPair<String, CompletionCandidate>>>flatMap(new AllPrefixes(minPrefix, 3), name("Extract Prefixes")) + .accumulateByKey(topNByKey, new Function.ToKeyValue<KeyValPair<String, CompletionCandidate>, String, CompletionCandidate>() + { + @Override + public Tuple<KeyValPair<String, CompletionCandidate>> f(KeyValPair<String, CompletionCandidate> tuple) + { + // TODO: Should be removed after Auto-wrapping is supported. + return new Tuple.WindowedTuple<>(Window.GLOBAL_WINDOW, tuple); + } + }, name("TopNByKey")); + } + } + + /** + * FlapMap Function to extract all prefixes of the hashtag in the input CompletionCandidate, and output + * KeyValPairs of the prefix and the CompletionCandidate + */ + private static class AllPrefixes implements Function.FlatMapFunction<CompletionCandidate, KeyValPair<String, CompletionCandidate>> + { + private final int minPrefix; + private final int maxPrefix; + + public AllPrefixes() + { + this(0, Integer.MAX_VALUE); + } + + public AllPrefixes(int minPrefix) + { + this(minPrefix, Integer.MAX_VALUE); + } + + public AllPrefixes(int minPrefix, int maxPrefix) + { + this.minPrefix = minPrefix; + this.maxPrefix = maxPrefix; + } + + @Override + public Iterable<KeyValPair<String, CompletionCandidate>> f(CompletionCandidate input) + { + List<KeyValPair<String, CompletionCandidate>> result = new LinkedList<>(); + String word = input.getValue(); + for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) { + result.add(new KeyValPair<>(input.getValue().substring(0, i).toLowerCase(), input)); + } + return result; + } + } + + /** + * A Composite stream transform that takes as input a list of tokens and returns + * the most common tokens per prefix. + */ + public static class ComputeTopCompletions + extends CompositeStreamTransform<WindowedStream<String>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>> + { + private final int candidatesPerPrefix; + private final boolean recursive; + + protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive) + { + this.candidatesPerPrefix = candidatesPerPrefix; + this.recursive = recursive; + } + + public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive) + { + return new ComputeTopCompletions(candidatesPerPrefix, recursive); + } + + @Override + @SuppressWarnings("unchecked") + public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(WindowedStream<String> inputStream) + { + + ApexStream<CompletionCandidate> candidates = inputStream + .countByKey(new Function.ToKeyValue<String, String, Long>() + { + @Override + public Tuple<KeyValPair<String, Long>> f(String input) + { + return new Tuple.PlainTuple<>(new KeyValPair<>(input, 1L)); + } + }, name("Hashtag Count")).map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String,Long>>, CompletionCandidate>() + { + @Override + public CompletionCandidate f(Tuple.WindowedTuple<KeyValPair<String, Long>> input) + { + return new CompletionCandidate(input.getValue().getKey(), input.getValue().getValue()); + } + }, name("KeyValPair to CompletionCandidate")); + + return candidates.addCompositeStreams(new ComputeTopFlat(candidatesPerPrefix, 1)); + + } + } + + /** + * FilterFunction to filter out tweets with non-acsii characters. + */ + static class ASCIIFilter implements Function.FilterFunction<String> + { + @Override + public boolean f(String input) + { + return StringUtils.isAscii(input); + } + } + + /** + * Populate the dag with High-Level API. + * @param dag + * @param conf + */ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + TwitterSampleInput input = new TwitterSampleInput(); + + WindowOption windowOption = new WindowOption.GlobalWindow(); + + ApexStream<String> tags = StreamFactory.fromInput(input, input.text, name("tweetSampler")) + .filter(new ASCIIFilter(), name("ACSII Filter")) + .flatMap(new ExtractHashtags(), name("Extract Hashtags")); + + ApexStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> s = + tags.window(windowOption, new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(10))) + .addCompositeStreams(ComputeTopCompletions.top(10, true)).print(); + + s.populateDag(dag); + } +}
