http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java deleted file mode 100644 index 327c882..0000000 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java +++ /dev/null @@ -1,128 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.stream.sample; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -import org.apache.apex.malhar.lib.window.TriggerOption; -import org.apache.apex.malhar.lib.window.Tuple; -import org.apache.apex.malhar.lib.window.WindowOption; -import org.apache.apex.malhar.stream.api.function.Function; -import org.apache.apex.malhar.stream.api.impl.StreamFactory; -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.Context; -import com.datatorrent.api.DAG; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.lib.util.KeyValPair; - -import static org.apache.apex.malhar.stream.api.Option.Options.name; - -/** - * Beam MinimalWordCount Example - * - * @since 3.5.0 - */ -@ApplicationAnnotation(name = "MinimalWordCount") -public class MinimalWordCount implements StreamingApplication -{ - public static class Collector extends BaseOperator - { - static Map<String, Long> result; - private static boolean done = false; - - public static boolean isDone() - { - return done; - } - - @Override - public void setup(Context.OperatorContext context) - { - done = false; - result = new HashMap<>(); - } - - public final transient DefaultInputPort<KeyValPair<String, Long>> input = new DefaultInputPort<KeyValPair<String, Long>>() - { - @Override - public void process(KeyValPair<String, Long> tuple) - { - if (tuple.getKey().equals("bye")) { - done = true; - } - result.put(tuple.getKey(), tuple.getValue()); - } - }; - } - - /** - * Populate the dag using High-Level API. - * @param dag - * @param conf - */ - @Override - public void populateDAG(DAG dag, Configuration conf) - { - Collector collector = new Collector(); - // Create a stream reading from a file line by line using StreamFactory. - StreamFactory.fromFolder("./src/test/resources/wordcount", name("textInput")) - // Use a flatmap transformation to extract words from the incoming stream of lines. - .flatMap(new Function.FlatMapFunction<String, String>() - { - @Override - public Iterable<String> f(String input) - { - return Arrays.asList(input.split("[^a-zA-Z']+")); - - } - }, name("ExtractWords")) - // Apply windowing to the stream for counting, in this case, the window option is global window. - .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1)) - // Count the appearances of every word. - .countByKey(new Function.ToKeyValue<String, String, Long>() - { - @Override - public Tuple<KeyValPair<String, Long>> f(String input) - { - return new Tuple.PlainTuple<KeyValPair<String, Long>>(new KeyValPair<String, Long>(input, 1L)); - } - }, name("countByKey")) - // Format the counting result to a readable format by unwrapping the tuples. - .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, KeyValPair<String, Long>>() - { - @Override - public KeyValPair<String, Long> f(Tuple.WindowedTuple<KeyValPair<String, Long>> input) - { - return input.getValue(); - } - }, name("FormatResults")) - // Print the result. - .print(name("console")) - // Attach a collector to the stream to collect results. - .endWith(collector, collector.input, name("Collector")) - // populate the dag using the stream. - .populateDag(dag); - } -}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java deleted file mode 100644 index 5b83bd0..0000000 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java +++ /dev/null @@ -1,290 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.stream.sample; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -import org.joda.time.Duration; -import org.apache.apex.malhar.lib.window.TriggerOption; -import org.apache.apex.malhar.lib.window.Tuple; -import org.apache.apex.malhar.lib.window.WindowOption; -import org.apache.apex.malhar.stream.api.ApexStream; -import org.apache.apex.malhar.stream.api.WindowedStream; -import org.apache.apex.malhar.stream.api.function.Function; -import org.apache.apex.malhar.stream.api.impl.StreamFactory; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.conf.Configuration; - -import com.google.common.base.Throwables; -import com.datatorrent.api.Context; -import com.datatorrent.api.DAG; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.InputOperator; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.lib.util.KeyValPair; - -import static org.apache.apex.malhar.stream.api.Option.Options.name; - -/** - * Beam WindowedWordCount Example. - * - * @since 3.5.0 - */ -@ApplicationAnnotation(name = "WindowedWordCount") -public class WindowedWordCount implements StreamingApplication -{ - static final int WINDOW_SIZE = 1; // Default window duration in minutes - - /** - * A input operator that reads from and output a file line by line to downstream with a time gap between - * every two lines. - */ - public static class TextInput extends BaseOperator implements InputOperator - { - public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>(); - private boolean done = false; - - private transient BufferedReader reader; - - @Override - public void setup(Context.OperatorContext context) - { - done = false; - initReader(); - } - - private void initReader() - { - try { - InputStream resourceStream = this.getClass().getResourceAsStream("/wordcount/word.txt"); - reader = new BufferedReader(new InputStreamReader(resourceStream)); - } catch (Exception ex) { - throw Throwables.propagate(ex); - } - } - - @Override - public void teardown() - { - IOUtils.closeQuietly(reader); - } - - @Override - public void emitTuples() - { - if (!done) { - try { - String line = reader.readLine(); - if (line == null) { - done = true; - reader.close(); - } else { - this.output.emit(line); - } - Thread.sleep(50); - } catch (IOException ex) { - throw new RuntimeException(ex); - } catch (InterruptedException e) { - throw Throwables.propagate(e); - } - } - } - } - - public static class Collector extends BaseOperator - { - private static Map<KeyValPair<Long, String>, Long> result = new HashMap<>(); - private static boolean done = false; - - @Override - public void setup(Context.OperatorContext context) - { - super.setup(context); - done = false; - } - - public static boolean isDone() - { - return done; - } - - public static Map<KeyValPair<Long, String>, Long> getResult() - { - return result; - } - - public final transient DefaultInputPort<PojoEvent> input = new DefaultInputPort<PojoEvent>() - { - @Override - public void process(PojoEvent tuple) - { - result.put(new KeyValPair<Long, String>(tuple.getTimestamp(), tuple.getWord()), tuple.getCount()); - if (tuple.getWord().equals("bye")) { - done = true; - } - } - }; - } - - /** - * A Pojo Tuple class used for outputting result to JDBC. - */ - public static class PojoEvent - { - private String word; - private long count; - private long timestamp; - - @Override - public String toString() - { - return "PojoEvent (word=" + getWord() + ", count=" + getCount() + ", timestamp=" + getTimestamp() + ")"; - } - - public String getWord() - { - return word; - } - - public void setWord(String word) - { - this.word = word; - } - - public long getCount() - { - return count; - } - - public void setCount(long count) - { - this.count = count; - } - - public long getTimestamp() - { - return timestamp; - } - - public void setTimestamp(long timestamp) - { - this.timestamp = timestamp; - } - } - - /** - * A map function that wrap the input string with a random generated timestamp. - */ - public static class AddTimestampFn implements Function.MapFunction<String, Tuple.TimestampedTuple<String>> - { - private static final Duration RAND_RANGE = Duration.standardMinutes(10); - private final Long minTimestamp; - - AddTimestampFn() - { - this.minTimestamp = System.currentTimeMillis(); - } - - @Override - public Tuple.TimestampedTuple<String> f(String input) - { - // Generate a timestamp that falls somewhere in the past two hours. - long randMillis = (long)(Math.random() * RAND_RANGE.getMillis()); - long randomTimestamp = minTimestamp + randMillis; - - return new Tuple.TimestampedTuple<>(randomTimestamp, input); - } - } - - /** A MapFunction that converts a Word and Count into a PojoEvent. */ - public static class FormatAsTableRowFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, PojoEvent> - { - @Override - public PojoEvent f(Tuple.WindowedTuple<KeyValPair<String, Long>> input) - { - PojoEvent row = new PojoEvent(); - row.setTimestamp(input.getTimestamp()); - row.setCount(input.getValue().getValue()); - row.setWord(input.getValue().getKey()); - return row; - } - } - - /** - * Populate dag with High-Level API. - * @param dag - * @param conf - */ - @Override - public void populateDAG(DAG dag, Configuration conf) - { - TextInput input = new TextInput(); - Collector collector = new Collector(); - - // Create stream from the TextInput operator. - ApexStream<Tuple.TimestampedTuple<String>> stream = StreamFactory.fromInput(input, input.output, name("input")) - - // Extract all the words from the input line of text. - .flatMap(new Function.FlatMapFunction<String, String>() - { - @Override - public Iterable<String> f(String input) - { - return Arrays.asList(input.split("[\\p{Punct}\\s]+")); - } - }, name("ExtractWords")) - - // Wrap the word with a randomly generated timestamp. - .map(new AddTimestampFn(), name("AddTimestampFn")); - - - // apply window and trigger option. - // TODO: change trigger option to atWaterMark when available. - WindowedStream<Tuple.TimestampedTuple<String>> windowedWords = stream - .window(new WindowOption.TimeWindows(Duration.standardMinutes(WINDOW_SIZE)), - new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1)); - - - WindowedStream<PojoEvent> wordCounts = - // Perform a countByKey transformation to count the appearance of each word in every time window. - windowedWords.countByKey(new Function.ToKeyValue<Tuple.TimestampedTuple<String>, String, Long>() - { - @Override - public Tuple<KeyValPair<String, Long>> f(Tuple.TimestampedTuple<String> input) - { - return new Tuple.TimestampedTuple<KeyValPair<String, Long>>(input.getTimestamp(), - new KeyValPair<String, Long>(input.getValue(), 1L)); - } - }, name("count words")) - - // Format the output and print out the result. - .map(new FormatAsTableRowFn(), name("FormatAsTableRowFn")).print(name("console")); - - wordCounts.endWith(collector, collector.input, name("Collector")).populateDag(dag); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java deleted file mode 100644 index 2db59b6..0000000 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java +++ /dev/null @@ -1,324 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.stream.sample.complete; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.apex.malhar.lib.window.TriggerOption; -import org.apache.apex.malhar.lib.window.Tuple; -import org.apache.apex.malhar.lib.window.Window; -import org.apache.apex.malhar.lib.window.WindowOption; -import org.apache.apex.malhar.stream.api.ApexStream; -import org.apache.apex.malhar.stream.api.CompositeStreamTransform; -import org.apache.apex.malhar.stream.api.WindowedStream; -import org.apache.apex.malhar.stream.api.function.Function; -import org.apache.apex.malhar.stream.api.impl.StreamFactory; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.conf.Configuration; - -import com.google.common.base.Throwables; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DAG; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.InputOperator; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.lib.util.KeyValPair; - -import static org.apache.apex.malhar.stream.api.Option.Options.name; - -/** - * An example that computes the most popular hash tags - * for every prefix, which can be used for auto-completion. - * This application is identical to TwitterAutoComplete, except it's - * reading from a file. This application is mainly for local testing - * purpose. - * - * <p>This will update the datastore every 10 seconds based on the last - * 30 minutes of data received. - * - * @since 3.5.0 - */ -@ApplicationAnnotation(name = "AutoComplete") -public class AutoComplete implements StreamingApplication -{ - - /** - * A dummy Twitter input operator. It reads from a text file containing some tweets and output a line every - * half of a second. - */ - public static class TweetsInput extends BaseOperator implements InputOperator - { - public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>(); - private boolean done; - - private transient BufferedReader reader; - - @Override - public void setup(OperatorContext context) - { - done = false; - initReader(); - } - - private void initReader() - { - try { - InputStream resourceStream = this.getClass().getResourceAsStream("/sampletweets.txt"); - reader = new BufferedReader(new InputStreamReader(resourceStream)); - } catch (Exception ex) { - throw Throwables.propagate(ex); - } - } - - @Override - public void teardown() - { - IOUtils.closeQuietly(reader); - } - - @Override - public void emitTuples() - { - if (!done) { - try { - String line = reader.readLine(); - if (line == null) { - done = true; - reader.close(); - } else { - this.output.emit(line); - } - Thread.sleep(50); - } catch (IOException ex) { - throw new RuntimeException(ex); - } catch (InterruptedException e) { - // Ignore it. - } - } - } - } - - public static class Collector extends BaseOperator - { - private static Map<String, List<CompletionCandidate>> result = new HashMap<>(); - private static boolean done = false; - - public static boolean isDone() - { - return done; - } - - @Override - public void setup(OperatorContext context) - { - super.setup(context); - done = false; - } - - public static Map<String, List<CompletionCandidate>> getResult() - { - return result; - } - - public final transient DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> input = new DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>() - { - @Override - public void process(Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>> tuple) - { - if (tuple.getValue().getKey().equals("yarn")) { - done = true; - } - result.put(tuple.getValue().getKey(), tuple.getValue().getValue()); - } - }; - } - - /** - * FlapMap Function to extract all hashtags from a text form tweet. - */ - private static class ExtractHashtags implements Function.FlatMapFunction<String, String> - { - - @Override - public Iterable<String> f(String input) - { - List<String> result = new LinkedList<>(); - Matcher m = Pattern.compile("#\\S+").matcher(input); - while (m.find()) { - result.add(m.group().substring(1)); - } - return result; - } - } - - /** - * Lower latency, but more expensive. - */ - private static class ComputeTopFlat - extends CompositeStreamTransform<WindowedStream<CompletionCandidate>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>> - { - private final int candidatesPerPrefix; - private final int minPrefix; - - public ComputeTopFlat(int candidatesPerPrefix, int minPrefix) - { - this.candidatesPerPrefix = candidatesPerPrefix; - this.minPrefix = minPrefix; - } - - @Override - public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose( - WindowedStream<CompletionCandidate> input) - { - return input - .<KeyValPair<String, CompletionCandidate>, WindowedStream<KeyValPair<String, CompletionCandidate>>>flatMap(new AllPrefixes(minPrefix)) - .accumulateByKey(new TopNByKey(), new Function.ToKeyValue<KeyValPair<String, CompletionCandidate>, String, - CompletionCandidate>() - { - @Override - public Tuple<KeyValPair<String, CompletionCandidate>> f(KeyValPair<String, CompletionCandidate> tuple) - { - // TODO: Should be removed after Auto-wrapping is supported. - return new Tuple.WindowedTuple<>(Window.GlobalWindow.INSTANCE, tuple); - } - }); - } - } - - /** - * FlapMap Function to extract all prefixes of the hashtag in the input CompletionCandidate, and output - * KeyValPairs of the prefix and the CompletionCandidate - */ - private static class AllPrefixes implements Function.FlatMapFunction<CompletionCandidate, KeyValPair<String, CompletionCandidate>> - { - private final int minPrefix; - private final int maxPrefix; - - public AllPrefixes() - { - this(0, Integer.MAX_VALUE); - } - - public AllPrefixes(int minPrefix) - { - this(minPrefix, Integer.MAX_VALUE); - } - - public AllPrefixes(int minPrefix, int maxPrefix) - { - this.minPrefix = minPrefix; - this.maxPrefix = maxPrefix; - } - - @Override - public Iterable<KeyValPair<String, CompletionCandidate>> f(CompletionCandidate input) - { - List<KeyValPair<String, CompletionCandidate>> result = new LinkedList<>(); - String word = input.getValue(); - for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) { - - result.add(new KeyValPair<>(input.getValue().substring(0, i).toLowerCase(), input)); - } - return result; - } - } - - /** - * A Composite stream transform that takes as input a list of tokens and returns - * the most common tokens per prefix. - */ - public static class ComputeTopCompletions - extends CompositeStreamTransform<WindowedStream<String>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>> - { - private final int candidatesPerPrefix; - private final boolean recursive; - - protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive) - { - this.candidatesPerPrefix = candidatesPerPrefix; - this.recursive = recursive; - } - - public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive) - { - return new ComputeTopCompletions(candidatesPerPrefix, recursive); - } - - @Override - @SuppressWarnings("unchecked") - public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(WindowedStream<String> inputStream) - { - ApexStream<CompletionCandidate> candidates = inputStream - .countByKey(new Function.ToKeyValue<String, String, Long>() - { - @Override - public Tuple<KeyValPair<String, Long>> f(String input) - { - return new Tuple.PlainTuple<>(new KeyValPair<>(input, 1L)); - } - }, name("countByKey")) - .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String,Long>>, CompletionCandidate>() - { - @Override - public CompletionCandidate f(Tuple.WindowedTuple<KeyValPair<String, Long>> input) - { - return new CompletionCandidate(input.getValue().getKey(), input.getValue().getValue()); - } - }, name("ToCompletionCandidate")); - - return candidates.addCompositeStreams(new ComputeTopFlat(10, 1)); - - } - } - - /** - * Populate the dag with High-Level API. - * @param dag - * @param conf - */ - @Override - public void populateDAG(DAG dag, Configuration conf) - { - TweetsInput input = new TweetsInput(); - Collector collector = new Collector(); - - WindowOption windowOption = new WindowOption.GlobalWindow(); - - ApexStream<String> tags = StreamFactory.fromInput(input, input.output, name("tweetSampler")) - .flatMap(new ExtractHashtags()); - - tags.window(windowOption, new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1)) - .addCompositeStreams(ComputeTopCompletions.top(10, true)).print(name("console")) - .endWith(collector, collector.input, name("collector")) - .populateDag(dag); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java deleted file mode 100644 index 991424e..0000000 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.stream.sample.complete; - -/** - * Class used to store tag-count pairs in Auto Complete Demo. - * - * @since 3.5.0 - */ -public class CompletionCandidate implements Comparable<CompletionCandidate> -{ - private long count; - private String value; - - public CompletionCandidate(String value, long count) - { - this.value = value; - this.count = count; - } - - public long getCount() - { - return count; - } - - public String getValue() - { - return value; - } - - // Empty constructor required for Kryo. - public CompletionCandidate() - { - - } - - @Override - public int compareTo(CompletionCandidate o) - { - if (this.count < o.count) { - return -1; - } else if (this.count == o.count) { - return this.value.compareTo(o.value); - } else { - return 1; - } - } - - @Override - public boolean equals(Object other) - { - if (other instanceof CompletionCandidate) { - CompletionCandidate that = (CompletionCandidate)other; - return this.count == that.count && this.value.equals(that.value); - } else { - return false; - } - } - - @Override - public int hashCode() - { - return Long.valueOf(count).hashCode() ^ value.hashCode(); - } - - @Override - public String toString() - { - return "CompletionCandidate[" + value + ", " + count + "]"; - } -} - - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java deleted file mode 100644 index ee15d90..0000000 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.stream.sample.complete; - -/** - * Tuple Class for JdbcOutput of StreamingWordExtract. - * - * @since 3.5.0 - */ -public class PojoEvent extends Object -{ - private String stringValue; - - @Override - public String toString() - { - return "PojoEvent [stringValue=" + getStringValue() + "]"; - } - - public void setStringValue(String newString) - { - this.stringValue = newString; - } - - public String getStringValue() - { - return this.stringValue; - } -} - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java deleted file mode 100644 index 07f01d0..0000000 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java +++ /dev/null @@ -1,162 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.stream.sample.complete; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import org.apache.apex.malhar.stream.api.ApexStream; -import org.apache.apex.malhar.stream.api.Option; -import org.apache.apex.malhar.stream.api.function.Function; -import org.apache.apex.malhar.stream.api.impl.StreamFactory; -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.DAG; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.lib.db.jdbc.JdbcFieldInfo; -import com.datatorrent.lib.db.jdbc.JdbcPOJOInsertOutputOperator; -import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore; - -import static java.sql.Types.VARCHAR; - -/** - * Beam StreamingWordExtract Example. - * - * @since 3.5.0 - */ -@ApplicationAnnotation(name = "StreamingWordExtract") -public class StreamingWordExtract implements StreamingApplication -{ - private static int wordCount = 0; // A counter to count number of words have been extracted. - private static int entriesMapped = 0; // A counter to count number of entries have been mapped. - - public int getWordCount() - { - return wordCount; - } - - public int getEntriesMapped() - { - return entriesMapped; - } - - /** - * A MapFunction that tokenizes lines of text into individual words. - */ - public static class ExtractWords implements Function.FlatMapFunction<String, String> - { - @Override - public Iterable<String> f(String input) - { - List<String> result = new ArrayList<>(Arrays.asList(input.split("[^a-zA-Z0-9']+"))); - wordCount += result.size(); - return result; - } - } - - - /** - * A MapFunction that uppercases a word. - */ - public static class Uppercase implements Function.MapFunction<String, String> - { - @Override - public String f(String input) - { - return input.toUpperCase(); - } - } - - - /** - * A filter function to filter out empty strings. - */ - public static class EmptyStringFilter implements Function.FilterFunction<String> - { - @Override - public boolean f(String input) - { - return !input.isEmpty(); - } - } - - - /** - * A map function to map the result string to a pojo entry. - */ - public static class PojoMapper implements Function.MapFunction<String, Object> - { - - @Override - public Object f(String input) - { - PojoEvent pojo = new PojoEvent(); - pojo.setStringValue(input); - entriesMapped++; - return pojo; - } - } - - /** - * Add field infos to the {@link JdbcPOJOInsertOutputOperator}. - */ - private static List<JdbcFieldInfo> addFieldInfos() - { - List<JdbcFieldInfo> fieldInfos = new ArrayList<>(); - fieldInfos.add(new JdbcFieldInfo("STRINGVALUE", "stringValue", JdbcFieldInfo.SupportType.STRING, VARCHAR)); - return fieldInfos; - } - - /** - * Populate dag with High-Level API. - * @param dag - * @param conf - */ - @Override - public void populateDAG(DAG dag, Configuration conf) - { - JdbcPOJOInsertOutputOperator jdbcOutput = new JdbcPOJOInsertOutputOperator(); - jdbcOutput.setFieldInfos(addFieldInfos()); - JdbcTransactionalStore outputStore = new JdbcTransactionalStore(); - jdbcOutput.setStore(outputStore); - jdbcOutput.setTablename("TestTable"); - - // Create a stream reading from a folder. - ApexStream<String> stream = StreamFactory.fromFolder("./src/test/resources/data"); - - // Extract all the words from the input line of text. - stream.flatMap(new ExtractWords()) - - // Filter out the empty strings. - .filter(new EmptyStringFilter()) - - // Change every word to uppercase. - .map(new Uppercase()) - - // Map the resulted word to a Pojo entry. - .map(new PojoMapper()) - - // Output the entries to JdbcOutput and insert them into a table. - .endWith(jdbcOutput, jdbcOutput.input, Option.Options.name("jdbcOutput")); - - stream.populateDag(dag); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopNByKey.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopNByKey.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopNByKey.java deleted file mode 100644 index c7ccae3..0000000 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopNByKey.java +++ /dev/null @@ -1,120 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.stream.sample.complete; - -import java.util.Comparator; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -import org.apache.apex.malhar.lib.window.Accumulation; - -/** - * Specialized TopNByKey accumulation for AutoComplete Demo. - * - * @since 3.5.0 - */ -public class TopNByKey implements - Accumulation<CompletionCandidate, Map<String, Long>, List<CompletionCandidate>> -{ - int n = 10; - - Comparator comparator; - - public void setN(int n) - { - this.n = n; - } - - public void setComparator(Comparator comparator) - { - this.comparator = comparator; - } - - @Override - public Map<String, Long> defaultAccumulatedValue() - { - return new HashMap<>(); - } - - /** - * Accumulate the input. Update the entry in the Accumulation Map if the key of the input is existed, create a - * new entry otherwise. - * @param accumulatedValue - * @param input - * @return - */ - @Override - public Map<String, Long> accumulate(Map<String, Long> accumulatedValue, CompletionCandidate input) - { - accumulatedValue.put(input.getValue(), input.getCount()); - return accumulatedValue; - } - - /** - * Merge two Maps together. For every key, keep the larger value in the resulted Map. - * @param accumulatedValue1 - * @param accumulatedValue2 - * @return - */ - @Override - public Map<String, Long> merge(Map<String, Long> accumulatedValue1, Map<String, Long> accumulatedValue2) - { - for (Map.Entry<String, Long> entry : accumulatedValue2.entrySet()) { - if (accumulatedValue1.containsKey(entry.getKey()) && accumulatedValue1.get(entry.getKey()) > entry.getValue()) { - continue; - } - accumulatedValue1.put(entry.getKey(), entry.getValue()); - } - return accumulatedValue1; - } - - /** - * Loop through the Accumulation Map to get the top n entries based on their values, return a list containing - * those entries. - * @param accumulatedValue - * @return - */ - @Override - public List<CompletionCandidate> getOutput(Map<String, Long> accumulatedValue) - { - LinkedList<CompletionCandidate> result = new LinkedList<>(); - for (Map.Entry<String, Long> entry : accumulatedValue.entrySet()) { - int k = 0; - for (CompletionCandidate inMemory : result) { - if (entry.getValue() > inMemory.getCount()) { - break; - } - k++; - } - result.add(k, new CompletionCandidate(entry.getKey(), entry.getValue())); - if (result.size() > n) { - result.remove(result.get(result.size() - 1)); - } - } - return result; - } - - @Override - public List<CompletionCandidate> getRetraction(List<CompletionCandidate> value) - { - return new LinkedList<>(); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java deleted file mode 100644 index 68ec733..0000000 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java +++ /dev/null @@ -1,347 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.stream.sample.complete; - -import java.util.ArrayList; -import java.util.Comparator; -import java.util.List; -import java.util.Random; -import javax.annotation.Nullable; - -import org.joda.time.Duration; - -import org.apache.apex.malhar.lib.window.TriggerOption; -import org.apache.apex.malhar.lib.window.Tuple; -import org.apache.apex.malhar.lib.window.Window; -import org.apache.apex.malhar.lib.window.WindowOption; -import org.apache.apex.malhar.lib.window.accumulation.TopN; -import org.apache.apex.malhar.stream.api.ApexStream; -import org.apache.apex.malhar.stream.api.CompositeStreamTransform; -import org.apache.apex.malhar.stream.api.WindowedStream; -import org.apache.apex.malhar.stream.api.function.Function; -import org.apache.apex.malhar.stream.api.impl.StreamFactory; -import org.apache.hadoop.conf.Configuration; - - -import com.datatorrent.api.Context; -import com.datatorrent.api.DAG; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.InputOperator; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.lib.util.KeyValPair; - -import static org.apache.apex.malhar.stream.api.Option.Options.name; - -/** - * Beam's TopWikipediaSessions Example. - * - * @since 3.5.0 - */ -@ApplicationAnnotation(name = "TopWikipediaSessions") -public class TopWikipediaSessions implements StreamingApplication -{ - /** - * A generator that outputs a stream of combinations of some users and some randomly generated edit time. - */ - public static class SessionGen extends BaseOperator implements InputOperator - { - private String[] names = new String[]{"user1", "user2", "user3", "user4"}; - public transient DefaultOutputPort<KeyValPair<String, Long>> output = new DefaultOutputPort<>(); - - private static final Duration RAND_RANGE = Duration.standardDays(365); - private Long minTimestamp; - private long sleepTime; - private static int tupleCount = 0; - - public static int getTupleCount() - { - return tupleCount; - } - - private String randomName(String[] names) - { - int index = new Random().nextInt(names.length); - return names[index]; - } - - @Override - public void setup(Context.OperatorContext context) - { - super.setup(context); - tupleCount = 0; - minTimestamp = System.currentTimeMillis(); - sleepTime = context.getValue(Context.OperatorContext.SPIN_MILLIS); - } - - @Override - public void emitTuples() - { - long randMillis = (long)(Math.random() * RAND_RANGE.getMillis()); - long randomTimestamp = minTimestamp + randMillis; - output.emit(new KeyValPair<String, Long>(randomName(names), randomTimestamp)); - tupleCount++; - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - // Ignore it. - } - } - } - - public static class Collector extends BaseOperator - { - private final int resultSize = 5; - private static List<List<TempWrapper>> result = new ArrayList<>(); - - public static List<List<TempWrapper>> getResult() - { - return result; - } - - public final transient DefaultInputPort<Tuple.WindowedTuple<List<TempWrapper>>> input = new DefaultInputPort<Tuple.WindowedTuple<List<TempWrapper>>>() - { - @Override - public void process(Tuple.WindowedTuple<List<TempWrapper>> tuple) - { - if (result.size() == resultSize) { - result.remove(0); - } - result.add(tuple.getValue()); - } - }; - } - - - /** - * Convert the upstream (user, time) combination to a timestamped tuple of user. - */ - static class ExtractUserAndTimestamp implements Function.MapFunction<KeyValPair<String, Long>, Tuple.TimestampedTuple<String>> - { - @Override - public Tuple.TimestampedTuple<String> f(KeyValPair<String, Long> input) - { - long timestamp = input.getValue(); - String userName = input.getKey(); - - // Sets the implicit timestamp field to be used in windowing. - return new Tuple.TimestampedTuple<>(timestamp, userName); - - } - } - - /** - * Computes the number of edits in each user session. A session is defined as - * a string of edits where each is separated from the next by less than an hour. - */ - static class ComputeSessions - extends CompositeStreamTransform<ApexStream<Tuple.TimestampedTuple<String>>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, Long>>>> - { - @Override - public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, Long>>> compose(ApexStream<Tuple.TimestampedTuple<String>> inputStream) - { - return inputStream - - // Chuck the stream into session windows. - .window(new WindowOption.SessionWindows(Duration.standardHours(1)), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1)) - - // Count the number of edits for a user within one session. - .countByKey(new Function.ToKeyValue<Tuple.TimestampedTuple<String>, String, Long>() - { - @Override - public Tuple.TimestampedTuple<KeyValPair<String, Long>> f(Tuple.TimestampedTuple<String> input) - { - return new Tuple.TimestampedTuple<KeyValPair<String, Long>>(input.getTimestamp(), new KeyValPair<String, Long>(input.getValue(), 1L)); - } - }, name("ComputeSessions")); - } - } - - /** - * A comparator class used for comparing two TempWrapper objects. - */ - public static class Comp implements Comparator<TempWrapper> - { - @Override - public int compare(TempWrapper o1, TempWrapper o2) - { - return Long.compare(o1.getValue().getValue(), o2.getValue().getValue()); - } - } - - /** - * A function to extract timestamp from a TempWrapper object. - */ - // TODO: Need to revisit and change back to using TimestampedTuple. - public static class TimestampExtractor implements com.google.common.base.Function<TempWrapper, Long> - { - @Override - public Long apply(@Nullable TempWrapper input) - { - return input.getTimestamp(); - } - } - - /** - * A temporary wrapper to wrap a KeyValPair and a timestamp together to represent a timestamped tuple, the reason - * for this is that we cannot resolve a type conflict when calling accumulate(). After the issue resolved, we can - * remove this class. - */ - public static class TempWrapper - { - private KeyValPair<String, Long> value; - private Long timestamp; - - public TempWrapper() - { - - } - - public TempWrapper(KeyValPair<String, Long> value, Long timestamp) - { - this.value = value; - this.timestamp = timestamp; - } - - @Override - public String toString() - { - return this.value + " - " + this.timestamp; - } - - public Long getTimestamp() - { - return timestamp; - } - - public void setTimestamp(Long timestamp) - { - this.timestamp = timestamp; - } - - public KeyValPair<String, Long> getValue() - { - return value; - } - - public void setValue(KeyValPair<String, Long> value) - { - this.value = value; - } - } - - /** - * Computes the longest session ending in each month, in this case we use 30 days to represent every month. - */ - private static class TopPerMonth - extends CompositeStreamTransform<ApexStream<Tuple.WindowedTuple<KeyValPair<String, Long>>>, WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>>> - { - - @Override - public WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>> compose(ApexStream<Tuple.WindowedTuple<KeyValPair<String, Long>>> inputStream) - { - TopN<TempWrapper> topN = new TopN<>(); - topN.setN(10); - topN.setComparator(new Comp()); - - return inputStream - - // Map the input WindowedTuple to a TempWrapper object. - .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, TempWrapper>() - { - @Override - public TempWrapper f(Tuple.WindowedTuple<KeyValPair<String, Long>> input) - { - Window window = input.getWindows().iterator().next(); - return new TempWrapper(input.getValue(), window.getBeginTimestamp()); - } - }, name("TempWrapper")) - - // Apply window and trigger option again, this time chuck the stream into fixed time windows. - .window(new WindowOption.TimeWindows(Duration.standardDays(30)), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(5))) - - // Compute the top 10 user-sessions with most number of edits. - .accumulate(topN, name("TopN")).with("timestampExtractor", new TimestampExtractor()); - } - } - - /** - * A map function that combine the user and his/her edit session together to a string and use that string as a key - * with number of edits in that session as value to create a new key value pair to send to downstream. - */ - static class SessionsToStringsDoFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, Tuple.WindowedTuple<KeyValPair<String, Long>>> - { - @Override - public Tuple.WindowedTuple<KeyValPair<String, Long>> f(Tuple.WindowedTuple<KeyValPair<String, Long>> input) - { - Window window = input.getWindows().iterator().next(); - return new Tuple.WindowedTuple<KeyValPair<String, Long>>(window, new KeyValPair<String, Long>( - input.getValue().getKey() + " : " + window.getBeginTimestamp() + " : " + window.getDurationMillis(), - input.getValue().getValue())); - } - } - - /** - * A flatmap function that turns the result into readable format. - */ - static class FormatOutputDoFn implements Function.FlatMapFunction<Tuple.WindowedTuple<List<TempWrapper>>, String> - { - @Override - public Iterable<String> f(Tuple.WindowedTuple<List<TempWrapper>> input) - { - ArrayList<String> result = new ArrayList<>(); - for (TempWrapper item : input.getValue()) { - String session = item.getValue().getKey(); - long count = item.getValue().getValue(); - Window window = input.getWindows().iterator().next(); - result.add(session + " + " + count + " : " + window.getBeginTimestamp()); - } - return result; - } - } - - /** - * A composite transform that compute the top wikipedia sessions. - */ - public static class ComputeTopSessions extends CompositeStreamTransform<ApexStream<KeyValPair<String, Long>>, WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>>> - { - @Override - public WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>> compose(ApexStream<KeyValPair<String, Long>> inputStream) - { - return inputStream - .map(new ExtractUserAndTimestamp(), name("ExtractUserAndTimestamp")) - .addCompositeStreams(new ComputeSessions()) - .map(new SessionsToStringsDoFn(), name("SessionsToStringsDoFn")) - .addCompositeStreams(new TopPerMonth()); - } - } - - @Override - public void populateDAG(DAG dag, Configuration conf) - { - SessionGen sg = new SessionGen(); - Collector collector = new Collector(); - StreamFactory.fromInput(sg, sg.output, name("sessionGen")) - .addCompositeStreams(new ComputeTopSessions()) - .print(name("console")) - .endWith(collector, collector.input, name("collector")).populateDag(dag); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java deleted file mode 100644 index e6a53d6..0000000 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java +++ /dev/null @@ -1,523 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.stream.sample.complete; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Hashtable; -import java.util.List; -import java.util.Map; - -import javax.annotation.Nullable; - -import org.joda.time.Duration; - -import org.apache.apex.malhar.lib.window.TriggerOption; -import org.apache.apex.malhar.lib.window.Tuple; -import org.apache.apex.malhar.lib.window.WindowOption; -import org.apache.apex.malhar.lib.window.accumulation.Group; -import org.apache.apex.malhar.stream.api.ApexStream; -import org.apache.apex.malhar.stream.api.CompositeStreamTransform; -import org.apache.apex.malhar.stream.api.WindowedStream; -import org.apache.apex.malhar.stream.api.function.Function; -import org.apache.apex.malhar.stream.api.impl.StreamFactory; -import org.apache.hadoop.conf.Configuration; - -import com.google.common.collect.Lists; - -import com.datatorrent.api.Context; -import com.datatorrent.api.DAG; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.InputOperator; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.lib.util.KeyValPair; - -import static org.apache.apex.malhar.stream.api.Option.Options.name; - -/** - * Beam's TrafficRoutes example. - * - * @since 3.5.0 - */ -@ApplicationAnnotation(name = "TrafficRoutes") -public class TrafficRoutes implements StreamingApplication -{ - static Map<String, String> sdStations = buildStationInfo(); - static final int WINDOW_DURATION = 3; // Default sliding window duration in minutes - static final int WINDOW_SLIDE_EVERY = 1; // Default window 'slide every' setting in minutes - - /** - * This class holds information about a station reading's average speed. - */ - public static class StationSpeed implements Comparable<StationSpeed> - { - @Nullable - String stationId; - @Nullable - Double avgSpeed; - @Nullable - Long timestamp; - - public StationSpeed() {} - - public StationSpeed(String stationId, Double avgSpeed, Long timestamp) - { - this.stationId = stationId; - this.avgSpeed = avgSpeed; - this.timestamp = timestamp; - } - - public void setAvgSpeed(@Nullable Double avgSpeed) - { - this.avgSpeed = avgSpeed; - } - - public void setStationId(@Nullable String stationId) - { - this.stationId = stationId; - } - - public void setTimestamp(@Nullable Long timestamp) - { - this.timestamp = timestamp; - } - - @Nullable - public Long getTimestamp() - { - return timestamp; - } - - public String getStationId() - { - return this.stationId; - } - - public Double getAvgSpeed() - { - return this.avgSpeed; - } - - @Override - public int compareTo(StationSpeed other) - { - return Long.compare(this.timestamp, other.timestamp); - } - } - - /** - * This class holds information about a route's speed/slowdown. - */ - static class RouteInfo - { - @Nullable - String route; - @Nullable - Double avgSpeed; - @Nullable - Boolean slowdownEvent; - - public RouteInfo() - { - - } - - public RouteInfo(String route, Double avgSpeed, Boolean slowdownEvent) - { - this.route = route; - this.avgSpeed = avgSpeed; - this.slowdownEvent = slowdownEvent; - } - - public String getRoute() - { - return this.route; - } - - public Double getAvgSpeed() - { - return this.avgSpeed; - } - - public Boolean getSlowdownEvent() - { - return this.slowdownEvent; - } - } - - /** - * Extract the timestamp field from the input string, and wrap the input string in a {@link Tuple.TimestampedTuple} - * with the extracted timestamp. - */ - static class ExtractTimestamps implements Function.MapFunction<String, Tuple.TimestampedTuple<String>> - { - - @Override - public Tuple.TimestampedTuple<String> f(String input) - { - String[] items = input.split(","); - String timestamp = tryParseTimestamp(items); - - return new Tuple.TimestampedTuple<>(Long.parseLong(timestamp), input); - } - } - - /** - * Filter out readings for the stations along predefined 'routes', and output - * (station, speed info) keyed on route. - */ - static class ExtractStationSpeedFn implements Function.FlatMapFunction<Tuple.TimestampedTuple<String>, KeyValPair<String, StationSpeed>> - { - - @Override - public Iterable<KeyValPair<String, StationSpeed>> f(Tuple.TimestampedTuple<String> input) - { - - ArrayList<KeyValPair<String, StationSpeed>> result = new ArrayList<>(); - String[] items = input.getValue().split(","); - String stationType = tryParseStationType(items); - // For this analysis, use only 'main line' station types - if (stationType != null && stationType.equals("ML")) { - Double avgSpeed = tryParseAvgSpeed(items); - String stationId = tryParseStationId(items); - // For this simple example, filter out everything but some hardwired routes. - if (avgSpeed != null && stationId != null && sdStations.containsKey(stationId)) { - StationSpeed stationSpeed = - new StationSpeed(stationId, avgSpeed, input.getTimestamp()); - // The tuple key is the 'route' name stored in the 'sdStations' hash. - KeyValPair<String, StationSpeed> outputValue = new KeyValPair<>(sdStations.get(stationId), stationSpeed); - result.add(outputValue); - } - } - return result; - } - } - - /** - * For a given route, track average speed for the window. Calculate whether - * traffic is currently slowing down, via a predefined threshold. If a supermajority of - * speeds in this sliding window are less than the previous reading we call this a 'slowdown'. - * Note: these calculations are for example purposes only, and are unrealistic and oversimplified. - */ - static class GatherStats - implements Function.FlatMapFunction<Tuple.WindowedTuple<KeyValPair<String, List<StationSpeed>>>, Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> - { - @Override - public Iterable<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> f(Tuple.WindowedTuple<KeyValPair<String, List<StationSpeed>>> input) - { - ArrayList<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> result = new ArrayList<>(); - String route = input.getValue().getKey(); - double speedSum = 0.0; - int speedCount = 0; - int speedups = 0; - int slowdowns = 0; - List<StationSpeed> infoList = Lists.newArrayList(input.getValue().getValue()); - // StationSpeeds sort by embedded timestamp. - Collections.sort(infoList); - Map<String, Double> prevSpeeds = new HashMap<>(); - // For all stations in the route, sum (non-null) speeds. Keep a count of the non-null speeds. - for (StationSpeed item : infoList) { - Double speed = item.getAvgSpeed(); - if (speed != null) { - speedSum += speed; - speedCount++; - Double lastSpeed = prevSpeeds.get(item.getStationId()); - if (lastSpeed != null) { - if (lastSpeed < speed) { - speedups += 1; - } else { - slowdowns += 1; - } - } - prevSpeeds.put(item.getStationId(), speed); - } - } - if (speedCount == 0) { - // No average to compute. - return result; - } - double speedAvg = speedSum / speedCount; - boolean slowdownEvent = slowdowns >= 2 * speedups; - RouteInfo routeInfo = new RouteInfo(route, speedAvg, slowdownEvent); - result.add(new Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>(input.getTimestamp(), new KeyValPair<String, RouteInfo>(route, routeInfo))); - return result; - } - } - - /** - * Output Pojo class for outputting result to JDBC. - */ - static class OutputPojo - { - private Double avgSpeed; - private Boolean slowdownEvent; - private String key; - private Long timestamp; - - public OutputPojo() - { - } - - public OutputPojo(Double avgSpeed, Boolean slowdownEvent, String key, Long timestamp) - { - this.avgSpeed = avgSpeed; - this.slowdownEvent = slowdownEvent; - this.key = key; - this.timestamp = timestamp; - } - - @Override - public String toString() - { - return key + " + " + avgSpeed + " + " + slowdownEvent + " + " + timestamp; - } - - public void setTimestamp(Long timestamp) - { - this.timestamp = timestamp; - } - - public Long getTimestamp() - { - return timestamp; - } - - public void setAvgSpeed(Double avgSpeed) - { - this.avgSpeed = avgSpeed; - } - - public Double getAvgSpeed() - { - return avgSpeed; - } - - public void setKey(String key) - { - this.key = key; - } - - public String getKey() - { - return key; - } - - public void setSlowdownEvent(Boolean slowdownEvent) - { - this.slowdownEvent = slowdownEvent; - } - - public Boolean getSlowdownEvent() - { - return slowdownEvent; - } - - } - - public static class Collector extends BaseOperator - { - private static Map<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> result = new HashMap<>(); - - public static Map<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> getResult() - { - return result; - } - - public final transient DefaultInputPort<OutputPojo> input = new DefaultInputPort<OutputPojo>() - { - @Override - public void process(OutputPojo tuple) - { - result.put(new KeyValPair<Long, String>(tuple.getTimestamp(), tuple.getKey()), new KeyValPair<Double, Boolean>(tuple.getAvgSpeed(), tuple.getSlowdownEvent())); - } - }; - } - - /** - * Format the results of the slowdown calculations to a OutputPojo. - */ - static class FormatStatsFn implements Function.MapFunction<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>, OutputPojo> - { - @Override - public OutputPojo f(Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>> input) - { - RouteInfo routeInfo = input.getValue().getValue(); - OutputPojo row = new OutputPojo(routeInfo.getAvgSpeed(), routeInfo.getSlowdownEvent(), input.getValue().getKey(), input.getTimestamp()); - return row; - } - } - - - /** - * This composite transformation extracts speed info from traffic station readings. - * It groups the readings by 'route' and analyzes traffic slowdown for that route. - * Lastly, it formats the results for JDBC. - */ - static class TrackSpeed extends - CompositeStreamTransform<WindowedStream<KeyValPair<String, StationSpeed>>, WindowedStream<OutputPojo>> - { - @Override - public WindowedStream<OutputPojo> compose(WindowedStream<KeyValPair<String, StationSpeed>> inputStream) - { - // Apply a GroupByKey transform to collect a list of all station - // readings for a given route. - WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<StationSpeed>>>> timeGroup = - inputStream - .accumulateByKey(new Group<StationSpeed>(), new Function.ToKeyValue<KeyValPair<String, StationSpeed>, String, StationSpeed>() - { - @Override - public Tuple<KeyValPair<String, StationSpeed>> f(KeyValPair<String, StationSpeed> input) - { - return new Tuple.TimestampedTuple<>(input.getValue().getTimestamp(), input); - } - }, name("GroupByKey")); - - // Analyze 'slowdown' over the route readings. - WindowedStream<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> stats = timeGroup - .flatMap(new GatherStats(), name("GatherStats")); - - // Format the results for writing to JDBC table. - WindowedStream<OutputPojo> results = stats.map(new FormatStatsFn(), name("FormatStatsFn")); - - return results; - } - } - - - private static Double tryParseAvgSpeed(String[] inputItems) - { - try { - return Double.parseDouble(tryParseString(inputItems, 3)); - } catch (NumberFormatException e) { - return null; - } catch (NullPointerException e) { - return null; - } - } - - private static String tryParseStationType(String[] inputItems) - { - return tryParseString(inputItems, 2); - } - - private static String tryParseStationId(String[] inputItems) - { - return tryParseString(inputItems, 1); - } - - private static String tryParseTimestamp(String[] inputItems) - { - return tryParseString(inputItems, 0); - } - - private static String tryParseString(String[] inputItems, int index) - { - return inputItems.length >= index ? inputItems[index] : null; - } - - /** - * Define some small hard-wired San Diego 'routes' to track based on sensor station ID. - */ - private static Map<String, String> buildStationInfo() - { - Map<String, String> stations = new Hashtable<String, String>(); - stations.put("1108413", "SDRoute1"); // from freeway 805 S - stations.put("1108699", "SDRoute2"); // from freeway 78 E - stations.put("1108702", "SDRoute2"); - return stations; - } - - /** - * A dummy generator to generate some traffic information. - */ - public static class InfoGen extends BaseOperator implements InputOperator - { - public transient DefaultOutputPort<String> output = new DefaultOutputPort<>(); - - private String[] stationTypes = new String[]{"ML", "BL", "GL"}; - private int[] stationIDs = new int[]{1108413, 1108699, 1108702}; - private double ave = 55.0; - private long timestamp; - private static final Duration RAND_RANGE = Duration.standardMinutes(10); - private static int tupleCount = 0; - - public static int getTupleCount() - { - return tupleCount; - } - - @Override - public void setup(Context.OperatorContext context) - { - tupleCount = 0; - timestamp = System.currentTimeMillis(); - } - - @Override - public void emitTuples() - { - for (String stationType : stationTypes) { - for (int stationID : stationIDs) { - double speed = Math.random() * 20 + ave; - long time = (long)(Math.random() * RAND_RANGE.getMillis()) + timestamp; - try { - output.emit(time + "," + stationID + "," + stationType + "," + speed); - tupleCount++; - - Thread.sleep(50); - } catch (Exception e) { - // Ignore it - } - } - } - } - } - - @Override - public void populateDAG(DAG dag, Configuration conf) - { - InfoGen infoGen = new InfoGen(); - Collector collector = new Collector(); - - // Create a stream from the input operator. - ApexStream<Tuple.TimestampedTuple<String>> stream = StreamFactory.fromInput(infoGen, infoGen.output, name("infoGen")) - - // Extract the timestamp from the input and wrap it into a TimestampedTuple. - .map(new ExtractTimestamps(), name("ExtractTimestamps")); - - stream - // Extract the average speed of a station. - .flatMap(new ExtractStationSpeedFn(), name("ExtractStationSpeedFn")) - - // Apply window and trigger option. - .window(new WindowOption.SlidingTimeWindows(Duration.standardMinutes(WINDOW_DURATION), Duration.standardMinutes(WINDOW_SLIDE_EVERY)), new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(5000)).accumulatingFiredPanes()) - - // Apply TrackSpeed composite transformation to compute the route information. - .addCompositeStreams(new TrackSpeed()) - - // print the result to console. - .print(name("console")) - .endWith(collector, collector.input, name("Collector")) - .populateDag(dag); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java deleted file mode 100644 index 4fc80ea..0000000 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java +++ /dev/null @@ -1,254 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.stream.sample.complete; - -import java.nio.charset.Charset; -import java.nio.charset.CharsetEncoder; - -import java.util.LinkedList; -import java.util.List; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.joda.time.Duration; - -import org.apache.apex.malhar.lib.window.TriggerOption; -import org.apache.apex.malhar.lib.window.Tuple; -import org.apache.apex.malhar.lib.window.Window; -import org.apache.apex.malhar.lib.window.WindowOption; - -import org.apache.apex.malhar.stream.api.ApexStream; -import org.apache.apex.malhar.stream.api.CompositeStreamTransform; -import org.apache.apex.malhar.stream.api.WindowedStream; -import org.apache.apex.malhar.stream.api.function.Function; -import org.apache.apex.malhar.stream.api.impl.StreamFactory; - -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.DAG; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.contrib.twitter.TwitterSampleInput; -import com.datatorrent.lib.util.KeyValPair; - -import static org.apache.apex.malhar.stream.api.Option.Options.name; - -/** - * Auto Complete Hashtag Demo with real time twitter input. In order to run this application, you need to create an app - * at https://apps.twitter.com, then generate your consumer and access keys and tokens, and enter those information - * accordingly in /resources/META-INF/properties.xml. - * - * The authentication requires following 4 information. - * Your application consumer key, - * Your application consumer secret, - * Your twitter access token, and - * Your twitter access token secret. - * - * @since 3.5.0 - */ -@ApplicationAnnotation(name = "TwitterAutoComplete") -public class TwitterAutoComplete implements StreamingApplication -{ - /** - * Check whether every character in a string is ASCII encoding. - */ - public static class StringUtils - { - static CharsetEncoder encoder = Charset.forName("US-ASCII").newEncoder(); - - public static boolean isAscii(String v) - { - return encoder.canEncode(v); - } - } - - /** - * FlapMap Function to extract all hashtags from a text form tweet. - */ - private static class ExtractHashtags implements Function.FlatMapFunction<String, String> - { - - @Override - public Iterable<String> f(String input) - { - List<String> result = new LinkedList<>(); - Matcher m = Pattern.compile("#\\S+").matcher(input); - while (m.find()) { - result.add(m.group().substring(1)); - } - return result; - } - } - - /** - * Lower latency, but more expensive. - */ - private static class ComputeTopFlat - extends CompositeStreamTransform<WindowedStream<CompletionCandidate>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>> - { - private final int candidatesPerPrefix; - private final int minPrefix; - - public ComputeTopFlat(int candidatesPerPrefix, int minPrefix) - { - this.candidatesPerPrefix = candidatesPerPrefix; - this.minPrefix = minPrefix; - } - - @Override - public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose( - WindowedStream<CompletionCandidate> input) - { - TopNByKey topNByKey = new TopNByKey(); - topNByKey.setN(candidatesPerPrefix); - return input - .<KeyValPair<String, CompletionCandidate>, WindowedStream<KeyValPair<String, CompletionCandidate>>>flatMap(new AllPrefixes(minPrefix, 3), name("Extract Prefixes")) - .accumulateByKey(topNByKey, new Function.ToKeyValue<KeyValPair<String, CompletionCandidate>, String, CompletionCandidate>() - { - @Override - public Tuple<KeyValPair<String, CompletionCandidate>> f(KeyValPair<String, CompletionCandidate> tuple) - { - // TODO: Should be removed after Auto-wrapping is supported. - return new Tuple.WindowedTuple<>(Window.GlobalWindow.INSTANCE, tuple); - } - }, name("TopNByKey")); - } - } - - /** - * FlapMap Function to extract all prefixes of the hashtag in the input CompletionCandidate, and output - * KeyValPairs of the prefix and the CompletionCandidate - */ - private static class AllPrefixes implements Function.FlatMapFunction<CompletionCandidate, KeyValPair<String, CompletionCandidate>> - { - private final int minPrefix; - private final int maxPrefix; - - public AllPrefixes() - { - this(0, Integer.MAX_VALUE); - } - - public AllPrefixes(int minPrefix) - { - this(minPrefix, Integer.MAX_VALUE); - } - - public AllPrefixes(int minPrefix, int maxPrefix) - { - this.minPrefix = minPrefix; - this.maxPrefix = maxPrefix; - } - - @Override - public Iterable<KeyValPair<String, CompletionCandidate>> f(CompletionCandidate input) - { - List<KeyValPair<String, CompletionCandidate>> result = new LinkedList<>(); - String word = input.getValue(); - for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) { - result.add(new KeyValPair<>(input.getValue().substring(0, i).toLowerCase(), input)); - } - return result; - } - } - - /** - * A Composite stream transform that takes as input a list of tokens and returns - * the most common tokens per prefix. - */ - public static class ComputeTopCompletions - extends CompositeStreamTransform<WindowedStream<String>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>> - { - private final int candidatesPerPrefix; - private final boolean recursive; - - protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive) - { - this.candidatesPerPrefix = candidatesPerPrefix; - this.recursive = recursive; - } - - public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive) - { - return new ComputeTopCompletions(candidatesPerPrefix, recursive); - } - - @Override - @SuppressWarnings("unchecked") - public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(WindowedStream<String> inputStream) - { - - ApexStream<CompletionCandidate> candidates = inputStream - .countByKey(new Function.ToKeyValue<String, String, Long>() - { - @Override - public Tuple<KeyValPair<String, Long>> f(String input) - { - return new Tuple.PlainTuple<>(new KeyValPair<>(input, 1L)); - } - }, name("Hashtag Count")) - .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String,Long>>, CompletionCandidate>() - { - @Override - public CompletionCandidate f(Tuple.WindowedTuple<KeyValPair<String, Long>> input) - { - return new CompletionCandidate(input.getValue().getKey(), input.getValue().getValue()); - } - }, name("KeyValPair to CompletionCandidate")); - - return candidates.addCompositeStreams(new ComputeTopFlat(candidatesPerPrefix, 1)); - - } - } - - /** - * FilterFunction to filter out tweets with non-acsii characters. - */ - static class ASCIIFilter implements Function.FilterFunction<String> - { - @Override - public boolean f(String input) - { - return StringUtils.isAscii(input); - } - } - - /** - * Populate the dag with High-Level API. - * @param dag - * @param conf - */ - @Override - public void populateDAG(DAG dag, Configuration conf) - { - TwitterSampleInput input = new TwitterSampleInput(); - - WindowOption windowOption = new WindowOption.GlobalWindow(); - - ApexStream<String> tags = StreamFactory.fromInput(input, input.text, name("tweetSampler")) - .filter(new ASCIIFilter(), name("ACSII Filter")) - .flatMap(new ExtractHashtags(), name("Extract Hashtags")); - - ApexStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> s = - tags.window(windowOption, new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(10))) - .addCompositeStreams(ComputeTopCompletions.top(10, true)).print(); - - s.populateDag(dag); - } -}
