http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java ---------------------------------------------------------------------- diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java new file mode 100644 index 0000000..2db59b6 --- /dev/null +++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java @@ -0,0 +1,324 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.complete; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.stream.api.ApexStream; +import org.apache.apex.malhar.stream.api.CompositeStreamTransform; +import org.apache.apex.malhar.stream.api.WindowedStream; +import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.StreamFactory; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Throwables; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.util.KeyValPair; + +import static org.apache.apex.malhar.stream.api.Option.Options.name; + +/** + * An example that computes the most popular hash tags + * for every prefix, which can be used for auto-completion. + * This application is identical to TwitterAutoComplete, except it's + * reading from a file. This application is mainly for local testing + * purpose. + * + * <p>This will update the datastore every 10 seconds based on the last + * 30 minutes of data received. + * + * @since 3.5.0 + */ +@ApplicationAnnotation(name = "AutoComplete") +public class AutoComplete implements StreamingApplication +{ + + /** + * A dummy Twitter input operator. It reads from a text file containing some tweets and output a line every + * half of a second. + */ + public static class TweetsInput extends BaseOperator implements InputOperator + { + public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>(); + private boolean done; + + private transient BufferedReader reader; + + @Override + public void setup(OperatorContext context) + { + done = false; + initReader(); + } + + private void initReader() + { + try { + InputStream resourceStream = this.getClass().getResourceAsStream("/sampletweets.txt"); + reader = new BufferedReader(new InputStreamReader(resourceStream)); + } catch (Exception ex) { + throw Throwables.propagate(ex); + } + } + + @Override + public void teardown() + { + IOUtils.closeQuietly(reader); + } + + @Override + public void emitTuples() + { + if (!done) { + try { + String line = reader.readLine(); + if (line == null) { + done = true; + reader.close(); + } else { + this.output.emit(line); + } + Thread.sleep(50); + } catch (IOException ex) { + throw new RuntimeException(ex); + } catch (InterruptedException e) { + // Ignore it. + } + } + } + } + + public static class Collector extends BaseOperator + { + private static Map<String, List<CompletionCandidate>> result = new HashMap<>(); + private static boolean done = false; + + public static boolean isDone() + { + return done; + } + + @Override + public void setup(OperatorContext context) + { + super.setup(context); + done = false; + } + + public static Map<String, List<CompletionCandidate>> getResult() + { + return result; + } + + public final transient DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> input = new DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>() + { + @Override + public void process(Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>> tuple) + { + if (tuple.getValue().getKey().equals("yarn")) { + done = true; + } + result.put(tuple.getValue().getKey(), tuple.getValue().getValue()); + } + }; + } + + /** + * FlapMap Function to extract all hashtags from a text form tweet. + */ + private static class ExtractHashtags implements Function.FlatMapFunction<String, String> + { + + @Override + public Iterable<String> f(String input) + { + List<String> result = new LinkedList<>(); + Matcher m = Pattern.compile("#\\S+").matcher(input); + while (m.find()) { + result.add(m.group().substring(1)); + } + return result; + } + } + + /** + * Lower latency, but more expensive. + */ + private static class ComputeTopFlat + extends CompositeStreamTransform<WindowedStream<CompletionCandidate>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>> + { + private final int candidatesPerPrefix; + private final int minPrefix; + + public ComputeTopFlat(int candidatesPerPrefix, int minPrefix) + { + this.candidatesPerPrefix = candidatesPerPrefix; + this.minPrefix = minPrefix; + } + + @Override + public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose( + WindowedStream<CompletionCandidate> input) + { + return input + .<KeyValPair<String, CompletionCandidate>, WindowedStream<KeyValPair<String, CompletionCandidate>>>flatMap(new AllPrefixes(minPrefix)) + .accumulateByKey(new TopNByKey(), new Function.ToKeyValue<KeyValPair<String, CompletionCandidate>, String, + CompletionCandidate>() + { + @Override + public Tuple<KeyValPair<String, CompletionCandidate>> f(KeyValPair<String, CompletionCandidate> tuple) + { + // TODO: Should be removed after Auto-wrapping is supported. + return new Tuple.WindowedTuple<>(Window.GlobalWindow.INSTANCE, tuple); + } + }); + } + } + + /** + * FlapMap Function to extract all prefixes of the hashtag in the input CompletionCandidate, and output + * KeyValPairs of the prefix and the CompletionCandidate + */ + private static class AllPrefixes implements Function.FlatMapFunction<CompletionCandidate, KeyValPair<String, CompletionCandidate>> + { + private final int minPrefix; + private final int maxPrefix; + + public AllPrefixes() + { + this(0, Integer.MAX_VALUE); + } + + public AllPrefixes(int minPrefix) + { + this(minPrefix, Integer.MAX_VALUE); + } + + public AllPrefixes(int minPrefix, int maxPrefix) + { + this.minPrefix = minPrefix; + this.maxPrefix = maxPrefix; + } + + @Override + public Iterable<KeyValPair<String, CompletionCandidate>> f(CompletionCandidate input) + { + List<KeyValPair<String, CompletionCandidate>> result = new LinkedList<>(); + String word = input.getValue(); + for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) { + + result.add(new KeyValPair<>(input.getValue().substring(0, i).toLowerCase(), input)); + } + return result; + } + } + + /** + * A Composite stream transform that takes as input a list of tokens and returns + * the most common tokens per prefix. + */ + public static class ComputeTopCompletions + extends CompositeStreamTransform<WindowedStream<String>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>> + { + private final int candidatesPerPrefix; + private final boolean recursive; + + protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive) + { + this.candidatesPerPrefix = candidatesPerPrefix; + this.recursive = recursive; + } + + public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive) + { + return new ComputeTopCompletions(candidatesPerPrefix, recursive); + } + + @Override + @SuppressWarnings("unchecked") + public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(WindowedStream<String> inputStream) + { + ApexStream<CompletionCandidate> candidates = inputStream + .countByKey(new Function.ToKeyValue<String, String, Long>() + { + @Override + public Tuple<KeyValPair<String, Long>> f(String input) + { + return new Tuple.PlainTuple<>(new KeyValPair<>(input, 1L)); + } + }, name("countByKey")) + .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String,Long>>, CompletionCandidate>() + { + @Override + public CompletionCandidate f(Tuple.WindowedTuple<KeyValPair<String, Long>> input) + { + return new CompletionCandidate(input.getValue().getKey(), input.getValue().getValue()); + } + }, name("ToCompletionCandidate")); + + return candidates.addCompositeStreams(new ComputeTopFlat(10, 1)); + + } + } + + /** + * Populate the dag with High-Level API. + * @param dag + * @param conf + */ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + TweetsInput input = new TweetsInput(); + Collector collector = new Collector(); + + WindowOption windowOption = new WindowOption.GlobalWindow(); + + ApexStream<String> tags = StreamFactory.fromInput(input, input.output, name("tweetSampler")) + .flatMap(new ExtractHashtags()); + + tags.window(windowOption, new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1)) + .addCompositeStreams(ComputeTopCompletions.top(10, true)).print(name("console")) + .endWith(collector, collector.input, name("collector")) + .populateDag(dag); + } +}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java ---------------------------------------------------------------------- diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java new file mode 100644 index 0000000..bd5c511 --- /dev/null +++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.complete; + +/** + * Class used to store tag-count pairs in Auto Complete Example. + * + * @since 3.5.0 + */ +public class CompletionCandidate implements Comparable<CompletionCandidate> +{ + private long count; + private String value; + + public CompletionCandidate(String value, long count) + { + this.value = value; + this.count = count; + } + + public long getCount() + { + return count; + } + + public String getValue() + { + return value; + } + + // Empty constructor required for Kryo. + public CompletionCandidate() + { + + } + + @Override + public int compareTo(CompletionCandidate o) + { + if (this.count < o.count) { + return -1; + } else if (this.count == o.count) { + return this.value.compareTo(o.value); + } else { + return 1; + } + } + + @Override + public boolean equals(Object other) + { + if (other instanceof CompletionCandidate) { + CompletionCandidate that = (CompletionCandidate)other; + return this.count == that.count && this.value.equals(that.value); + } else { + return false; + } + } + + @Override + public int hashCode() + { + return Long.valueOf(count).hashCode() ^ value.hashCode(); + } + + @Override + public String toString() + { + return "CompletionCandidate[" + value + ", " + count + "]"; + } +} + + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java ---------------------------------------------------------------------- diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java new file mode 100644 index 0000000..ee15d90 --- /dev/null +++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.complete; + +/** + * Tuple Class for JdbcOutput of StreamingWordExtract. + * + * @since 3.5.0 + */ +public class PojoEvent extends Object +{ + private String stringValue; + + @Override + public String toString() + { + return "PojoEvent [stringValue=" + getStringValue() + "]"; + } + + public void setStringValue(String newString) + { + this.stringValue = newString; + } + + public String getStringValue() + { + return this.stringValue; + } +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java ---------------------------------------------------------------------- diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java new file mode 100644 index 0000000..07f01d0 --- /dev/null +++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.complete; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.apex.malhar.stream.api.ApexStream; +import org.apache.apex.malhar.stream.api.Option; +import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.StreamFactory; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.db.jdbc.JdbcFieldInfo; +import com.datatorrent.lib.db.jdbc.JdbcPOJOInsertOutputOperator; +import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore; + +import static java.sql.Types.VARCHAR; + +/** + * Beam StreamingWordExtract Example. + * + * @since 3.5.0 + */ +@ApplicationAnnotation(name = "StreamingWordExtract") +public class StreamingWordExtract implements StreamingApplication +{ + private static int wordCount = 0; // A counter to count number of words have been extracted. + private static int entriesMapped = 0; // A counter to count number of entries have been mapped. + + public int getWordCount() + { + return wordCount; + } + + public int getEntriesMapped() + { + return entriesMapped; + } + + /** + * A MapFunction that tokenizes lines of text into individual words. + */ + public static class ExtractWords implements Function.FlatMapFunction<String, String> + { + @Override + public Iterable<String> f(String input) + { + List<String> result = new ArrayList<>(Arrays.asList(input.split("[^a-zA-Z0-9']+"))); + wordCount += result.size(); + return result; + } + } + + + /** + * A MapFunction that uppercases a word. + */ + public static class Uppercase implements Function.MapFunction<String, String> + { + @Override + public String f(String input) + { + return input.toUpperCase(); + } + } + + + /** + * A filter function to filter out empty strings. + */ + public static class EmptyStringFilter implements Function.FilterFunction<String> + { + @Override + public boolean f(String input) + { + return !input.isEmpty(); + } + } + + + /** + * A map function to map the result string to a pojo entry. + */ + public static class PojoMapper implements Function.MapFunction<String, Object> + { + + @Override + public Object f(String input) + { + PojoEvent pojo = new PojoEvent(); + pojo.setStringValue(input); + entriesMapped++; + return pojo; + } + } + + /** + * Add field infos to the {@link JdbcPOJOInsertOutputOperator}. + */ + private static List<JdbcFieldInfo> addFieldInfos() + { + List<JdbcFieldInfo> fieldInfos = new ArrayList<>(); + fieldInfos.add(new JdbcFieldInfo("STRINGVALUE", "stringValue", JdbcFieldInfo.SupportType.STRING, VARCHAR)); + return fieldInfos; + } + + /** + * Populate dag with High-Level API. + * @param dag + * @param conf + */ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + JdbcPOJOInsertOutputOperator jdbcOutput = new JdbcPOJOInsertOutputOperator(); + jdbcOutput.setFieldInfos(addFieldInfos()); + JdbcTransactionalStore outputStore = new JdbcTransactionalStore(); + jdbcOutput.setStore(outputStore); + jdbcOutput.setTablename("TestTable"); + + // Create a stream reading from a folder. + ApexStream<String> stream = StreamFactory.fromFolder("./src/test/resources/data"); + + // Extract all the words from the input line of text. + stream.flatMap(new ExtractWords()) + + // Filter out the empty strings. + .filter(new EmptyStringFilter()) + + // Change every word to uppercase. + .map(new Uppercase()) + + // Map the resulted word to a Pojo entry. + .map(new PojoMapper()) + + // Output the entries to JdbcOutput and insert them into a table. + .endWith(jdbcOutput, jdbcOutput.input, Option.Options.name("jdbcOutput")); + + stream.populateDag(dag); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopNByKey.java ---------------------------------------------------------------------- diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopNByKey.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopNByKey.java new file mode 100644 index 0000000..937254c --- /dev/null +++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopNByKey.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.complete; + +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.apex.malhar.lib.window.Accumulation; + +/** + * Specialized TopNByKey accumulation for AutoComplete Example. + * + * @since 3.5.0 + */ +public class TopNByKey implements + Accumulation<CompletionCandidate, Map<String, Long>, List<CompletionCandidate>> +{ + int n = 10; + + Comparator comparator; + + public void setN(int n) + { + this.n = n; + } + + public void setComparator(Comparator comparator) + { + this.comparator = comparator; + } + + @Override + public Map<String, Long> defaultAccumulatedValue() + { + return new HashMap<>(); + } + + /** + * Accumulate the input. Update the entry in the Accumulation Map if the key of the input is existed, create a + * new entry otherwise. + * @param accumulatedValue + * @param input + * @return + */ + @Override + public Map<String, Long> accumulate(Map<String, Long> accumulatedValue, CompletionCandidate input) + { + accumulatedValue.put(input.getValue(), input.getCount()); + return accumulatedValue; + } + + /** + * Merge two Maps together. For every key, keep the larger value in the resulted Map. + * @param accumulatedValue1 + * @param accumulatedValue2 + * @return + */ + @Override + public Map<String, Long> merge(Map<String, Long> accumulatedValue1, Map<String, Long> accumulatedValue2) + { + for (Map.Entry<String, Long> entry : accumulatedValue2.entrySet()) { + if (accumulatedValue1.containsKey(entry.getKey()) && accumulatedValue1.get(entry.getKey()) > entry.getValue()) { + continue; + } + accumulatedValue1.put(entry.getKey(), entry.getValue()); + } + return accumulatedValue1; + } + + /** + * Loop through the Accumulation Map to get the top n entries based on their values, return a list containing + * those entries. + * @param accumulatedValue + * @return + */ + @Override + public List<CompletionCandidate> getOutput(Map<String, Long> accumulatedValue) + { + LinkedList<CompletionCandidate> result = new LinkedList<>(); + for (Map.Entry<String, Long> entry : accumulatedValue.entrySet()) { + int k = 0; + for (CompletionCandidate inMemory : result) { + if (entry.getValue() > inMemory.getCount()) { + break; + } + k++; + } + result.add(k, new CompletionCandidate(entry.getKey(), entry.getValue())); + if (result.size() > n) { + result.remove(result.get(result.size() - 1)); + } + } + return result; + } + + @Override + public List<CompletionCandidate> getRetraction(List<CompletionCandidate> value) + { + return new LinkedList<>(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java ---------------------------------------------------------------------- diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java new file mode 100644 index 0000000..68ec733 --- /dev/null +++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java @@ -0,0 +1,347 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.complete; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Random; +import javax.annotation.Nullable; + +import org.joda.time.Duration; + +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.lib.window.accumulation.TopN; +import org.apache.apex.malhar.stream.api.ApexStream; +import org.apache.apex.malhar.stream.api.CompositeStreamTransform; +import org.apache.apex.malhar.stream.api.WindowedStream; +import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.StreamFactory; +import org.apache.hadoop.conf.Configuration; + + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.util.KeyValPair; + +import static org.apache.apex.malhar.stream.api.Option.Options.name; + +/** + * Beam's TopWikipediaSessions Example. + * + * @since 3.5.0 + */ +@ApplicationAnnotation(name = "TopWikipediaSessions") +public class TopWikipediaSessions implements StreamingApplication +{ + /** + * A generator that outputs a stream of combinations of some users and some randomly generated edit time. + */ + public static class SessionGen extends BaseOperator implements InputOperator + { + private String[] names = new String[]{"user1", "user2", "user3", "user4"}; + public transient DefaultOutputPort<KeyValPair<String, Long>> output = new DefaultOutputPort<>(); + + private static final Duration RAND_RANGE = Duration.standardDays(365); + private Long minTimestamp; + private long sleepTime; + private static int tupleCount = 0; + + public static int getTupleCount() + { + return tupleCount; + } + + private String randomName(String[] names) + { + int index = new Random().nextInt(names.length); + return names[index]; + } + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + tupleCount = 0; + minTimestamp = System.currentTimeMillis(); + sleepTime = context.getValue(Context.OperatorContext.SPIN_MILLIS); + } + + @Override + public void emitTuples() + { + long randMillis = (long)(Math.random() * RAND_RANGE.getMillis()); + long randomTimestamp = minTimestamp + randMillis; + output.emit(new KeyValPair<String, Long>(randomName(names), randomTimestamp)); + tupleCount++; + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + // Ignore it. + } + } + } + + public static class Collector extends BaseOperator + { + private final int resultSize = 5; + private static List<List<TempWrapper>> result = new ArrayList<>(); + + public static List<List<TempWrapper>> getResult() + { + return result; + } + + public final transient DefaultInputPort<Tuple.WindowedTuple<List<TempWrapper>>> input = new DefaultInputPort<Tuple.WindowedTuple<List<TempWrapper>>>() + { + @Override + public void process(Tuple.WindowedTuple<List<TempWrapper>> tuple) + { + if (result.size() == resultSize) { + result.remove(0); + } + result.add(tuple.getValue()); + } + }; + } + + + /** + * Convert the upstream (user, time) combination to a timestamped tuple of user. + */ + static class ExtractUserAndTimestamp implements Function.MapFunction<KeyValPair<String, Long>, Tuple.TimestampedTuple<String>> + { + @Override + public Tuple.TimestampedTuple<String> f(KeyValPair<String, Long> input) + { + long timestamp = input.getValue(); + String userName = input.getKey(); + + // Sets the implicit timestamp field to be used in windowing. + return new Tuple.TimestampedTuple<>(timestamp, userName); + + } + } + + /** + * Computes the number of edits in each user session. A session is defined as + * a string of edits where each is separated from the next by less than an hour. + */ + static class ComputeSessions + extends CompositeStreamTransform<ApexStream<Tuple.TimestampedTuple<String>>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, Long>>>> + { + @Override + public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, Long>>> compose(ApexStream<Tuple.TimestampedTuple<String>> inputStream) + { + return inputStream + + // Chuck the stream into session windows. + .window(new WindowOption.SessionWindows(Duration.standardHours(1)), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1)) + + // Count the number of edits for a user within one session. + .countByKey(new Function.ToKeyValue<Tuple.TimestampedTuple<String>, String, Long>() + { + @Override + public Tuple.TimestampedTuple<KeyValPair<String, Long>> f(Tuple.TimestampedTuple<String> input) + { + return new Tuple.TimestampedTuple<KeyValPair<String, Long>>(input.getTimestamp(), new KeyValPair<String, Long>(input.getValue(), 1L)); + } + }, name("ComputeSessions")); + } + } + + /** + * A comparator class used for comparing two TempWrapper objects. + */ + public static class Comp implements Comparator<TempWrapper> + { + @Override + public int compare(TempWrapper o1, TempWrapper o2) + { + return Long.compare(o1.getValue().getValue(), o2.getValue().getValue()); + } + } + + /** + * A function to extract timestamp from a TempWrapper object. + */ + // TODO: Need to revisit and change back to using TimestampedTuple. + public static class TimestampExtractor implements com.google.common.base.Function<TempWrapper, Long> + { + @Override + public Long apply(@Nullable TempWrapper input) + { + return input.getTimestamp(); + } + } + + /** + * A temporary wrapper to wrap a KeyValPair and a timestamp together to represent a timestamped tuple, the reason + * for this is that we cannot resolve a type conflict when calling accumulate(). After the issue resolved, we can + * remove this class. + */ + public static class TempWrapper + { + private KeyValPair<String, Long> value; + private Long timestamp; + + public TempWrapper() + { + + } + + public TempWrapper(KeyValPair<String, Long> value, Long timestamp) + { + this.value = value; + this.timestamp = timestamp; + } + + @Override + public String toString() + { + return this.value + " - " + this.timestamp; + } + + public Long getTimestamp() + { + return timestamp; + } + + public void setTimestamp(Long timestamp) + { + this.timestamp = timestamp; + } + + public KeyValPair<String, Long> getValue() + { + return value; + } + + public void setValue(KeyValPair<String, Long> value) + { + this.value = value; + } + } + + /** + * Computes the longest session ending in each month, in this case we use 30 days to represent every month. + */ + private static class TopPerMonth + extends CompositeStreamTransform<ApexStream<Tuple.WindowedTuple<KeyValPair<String, Long>>>, WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>>> + { + + @Override + public WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>> compose(ApexStream<Tuple.WindowedTuple<KeyValPair<String, Long>>> inputStream) + { + TopN<TempWrapper> topN = new TopN<>(); + topN.setN(10); + topN.setComparator(new Comp()); + + return inputStream + + // Map the input WindowedTuple to a TempWrapper object. + .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, TempWrapper>() + { + @Override + public TempWrapper f(Tuple.WindowedTuple<KeyValPair<String, Long>> input) + { + Window window = input.getWindows().iterator().next(); + return new TempWrapper(input.getValue(), window.getBeginTimestamp()); + } + }, name("TempWrapper")) + + // Apply window and trigger option again, this time chuck the stream into fixed time windows. + .window(new WindowOption.TimeWindows(Duration.standardDays(30)), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(5))) + + // Compute the top 10 user-sessions with most number of edits. + .accumulate(topN, name("TopN")).with("timestampExtractor", new TimestampExtractor()); + } + } + + /** + * A map function that combine the user and his/her edit session together to a string and use that string as a key + * with number of edits in that session as value to create a new key value pair to send to downstream. + */ + static class SessionsToStringsDoFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, Tuple.WindowedTuple<KeyValPair<String, Long>>> + { + @Override + public Tuple.WindowedTuple<KeyValPair<String, Long>> f(Tuple.WindowedTuple<KeyValPair<String, Long>> input) + { + Window window = input.getWindows().iterator().next(); + return new Tuple.WindowedTuple<KeyValPair<String, Long>>(window, new KeyValPair<String, Long>( + input.getValue().getKey() + " : " + window.getBeginTimestamp() + " : " + window.getDurationMillis(), + input.getValue().getValue())); + } + } + + /** + * A flatmap function that turns the result into readable format. + */ + static class FormatOutputDoFn implements Function.FlatMapFunction<Tuple.WindowedTuple<List<TempWrapper>>, String> + { + @Override + public Iterable<String> f(Tuple.WindowedTuple<List<TempWrapper>> input) + { + ArrayList<String> result = new ArrayList<>(); + for (TempWrapper item : input.getValue()) { + String session = item.getValue().getKey(); + long count = item.getValue().getValue(); + Window window = input.getWindows().iterator().next(); + result.add(session + " + " + count + " : " + window.getBeginTimestamp()); + } + return result; + } + } + + /** + * A composite transform that compute the top wikipedia sessions. + */ + public static class ComputeTopSessions extends CompositeStreamTransform<ApexStream<KeyValPair<String, Long>>, WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>>> + { + @Override + public WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>> compose(ApexStream<KeyValPair<String, Long>> inputStream) + { + return inputStream + .map(new ExtractUserAndTimestamp(), name("ExtractUserAndTimestamp")) + .addCompositeStreams(new ComputeSessions()) + .map(new SessionsToStringsDoFn(), name("SessionsToStringsDoFn")) + .addCompositeStreams(new TopPerMonth()); + } + } + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + SessionGen sg = new SessionGen(); + Collector collector = new Collector(); + StreamFactory.fromInput(sg, sg.output, name("sessionGen")) + .addCompositeStreams(new ComputeTopSessions()) + .print(name("console")) + .endWith(collector, collector.input, name("collector")).populateDag(dag); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java ---------------------------------------------------------------------- diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java new file mode 100644 index 0000000..e6a53d6 --- /dev/null +++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java @@ -0,0 +1,523 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.complete; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.List; +import java.util.Map; + +import javax.annotation.Nullable; + +import org.joda.time.Duration; + +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.lib.window.accumulation.Group; +import org.apache.apex.malhar.stream.api.ApexStream; +import org.apache.apex.malhar.stream.api.CompositeStreamTransform; +import org.apache.apex.malhar.stream.api.WindowedStream; +import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.StreamFactory; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.Lists; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.util.KeyValPair; + +import static org.apache.apex.malhar.stream.api.Option.Options.name; + +/** + * Beam's TrafficRoutes example. + * + * @since 3.5.0 + */ +@ApplicationAnnotation(name = "TrafficRoutes") +public class TrafficRoutes implements StreamingApplication +{ + static Map<String, String> sdStations = buildStationInfo(); + static final int WINDOW_DURATION = 3; // Default sliding window duration in minutes + static final int WINDOW_SLIDE_EVERY = 1; // Default window 'slide every' setting in minutes + + /** + * This class holds information about a station reading's average speed. + */ + public static class StationSpeed implements Comparable<StationSpeed> + { + @Nullable + String stationId; + @Nullable + Double avgSpeed; + @Nullable + Long timestamp; + + public StationSpeed() {} + + public StationSpeed(String stationId, Double avgSpeed, Long timestamp) + { + this.stationId = stationId; + this.avgSpeed = avgSpeed; + this.timestamp = timestamp; + } + + public void setAvgSpeed(@Nullable Double avgSpeed) + { + this.avgSpeed = avgSpeed; + } + + public void setStationId(@Nullable String stationId) + { + this.stationId = stationId; + } + + public void setTimestamp(@Nullable Long timestamp) + { + this.timestamp = timestamp; + } + + @Nullable + public Long getTimestamp() + { + return timestamp; + } + + public String getStationId() + { + return this.stationId; + } + + public Double getAvgSpeed() + { + return this.avgSpeed; + } + + @Override + public int compareTo(StationSpeed other) + { + return Long.compare(this.timestamp, other.timestamp); + } + } + + /** + * This class holds information about a route's speed/slowdown. + */ + static class RouteInfo + { + @Nullable + String route; + @Nullable + Double avgSpeed; + @Nullable + Boolean slowdownEvent; + + public RouteInfo() + { + + } + + public RouteInfo(String route, Double avgSpeed, Boolean slowdownEvent) + { + this.route = route; + this.avgSpeed = avgSpeed; + this.slowdownEvent = slowdownEvent; + } + + public String getRoute() + { + return this.route; + } + + public Double getAvgSpeed() + { + return this.avgSpeed; + } + + public Boolean getSlowdownEvent() + { + return this.slowdownEvent; + } + } + + /** + * Extract the timestamp field from the input string, and wrap the input string in a {@link Tuple.TimestampedTuple} + * with the extracted timestamp. + */ + static class ExtractTimestamps implements Function.MapFunction<String, Tuple.TimestampedTuple<String>> + { + + @Override + public Tuple.TimestampedTuple<String> f(String input) + { + String[] items = input.split(","); + String timestamp = tryParseTimestamp(items); + + return new Tuple.TimestampedTuple<>(Long.parseLong(timestamp), input); + } + } + + /** + * Filter out readings for the stations along predefined 'routes', and output + * (station, speed info) keyed on route. + */ + static class ExtractStationSpeedFn implements Function.FlatMapFunction<Tuple.TimestampedTuple<String>, KeyValPair<String, StationSpeed>> + { + + @Override + public Iterable<KeyValPair<String, StationSpeed>> f(Tuple.TimestampedTuple<String> input) + { + + ArrayList<KeyValPair<String, StationSpeed>> result = new ArrayList<>(); + String[] items = input.getValue().split(","); + String stationType = tryParseStationType(items); + // For this analysis, use only 'main line' station types + if (stationType != null && stationType.equals("ML")) { + Double avgSpeed = tryParseAvgSpeed(items); + String stationId = tryParseStationId(items); + // For this simple example, filter out everything but some hardwired routes. + if (avgSpeed != null && stationId != null && sdStations.containsKey(stationId)) { + StationSpeed stationSpeed = + new StationSpeed(stationId, avgSpeed, input.getTimestamp()); + // The tuple key is the 'route' name stored in the 'sdStations' hash. + KeyValPair<String, StationSpeed> outputValue = new KeyValPair<>(sdStations.get(stationId), stationSpeed); + result.add(outputValue); + } + } + return result; + } + } + + /** + * For a given route, track average speed for the window. Calculate whether + * traffic is currently slowing down, via a predefined threshold. If a supermajority of + * speeds in this sliding window are less than the previous reading we call this a 'slowdown'. + * Note: these calculations are for example purposes only, and are unrealistic and oversimplified. + */ + static class GatherStats + implements Function.FlatMapFunction<Tuple.WindowedTuple<KeyValPair<String, List<StationSpeed>>>, Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> + { + @Override + public Iterable<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> f(Tuple.WindowedTuple<KeyValPair<String, List<StationSpeed>>> input) + { + ArrayList<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> result = new ArrayList<>(); + String route = input.getValue().getKey(); + double speedSum = 0.0; + int speedCount = 0; + int speedups = 0; + int slowdowns = 0; + List<StationSpeed> infoList = Lists.newArrayList(input.getValue().getValue()); + // StationSpeeds sort by embedded timestamp. + Collections.sort(infoList); + Map<String, Double> prevSpeeds = new HashMap<>(); + // For all stations in the route, sum (non-null) speeds. Keep a count of the non-null speeds. + for (StationSpeed item : infoList) { + Double speed = item.getAvgSpeed(); + if (speed != null) { + speedSum += speed; + speedCount++; + Double lastSpeed = prevSpeeds.get(item.getStationId()); + if (lastSpeed != null) { + if (lastSpeed < speed) { + speedups += 1; + } else { + slowdowns += 1; + } + } + prevSpeeds.put(item.getStationId(), speed); + } + } + if (speedCount == 0) { + // No average to compute. + return result; + } + double speedAvg = speedSum / speedCount; + boolean slowdownEvent = slowdowns >= 2 * speedups; + RouteInfo routeInfo = new RouteInfo(route, speedAvg, slowdownEvent); + result.add(new Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>(input.getTimestamp(), new KeyValPair<String, RouteInfo>(route, routeInfo))); + return result; + } + } + + /** + * Output Pojo class for outputting result to JDBC. + */ + static class OutputPojo + { + private Double avgSpeed; + private Boolean slowdownEvent; + private String key; + private Long timestamp; + + public OutputPojo() + { + } + + public OutputPojo(Double avgSpeed, Boolean slowdownEvent, String key, Long timestamp) + { + this.avgSpeed = avgSpeed; + this.slowdownEvent = slowdownEvent; + this.key = key; + this.timestamp = timestamp; + } + + @Override + public String toString() + { + return key + " + " + avgSpeed + " + " + slowdownEvent + " + " + timestamp; + } + + public void setTimestamp(Long timestamp) + { + this.timestamp = timestamp; + } + + public Long getTimestamp() + { + return timestamp; + } + + public void setAvgSpeed(Double avgSpeed) + { + this.avgSpeed = avgSpeed; + } + + public Double getAvgSpeed() + { + return avgSpeed; + } + + public void setKey(String key) + { + this.key = key; + } + + public String getKey() + { + return key; + } + + public void setSlowdownEvent(Boolean slowdownEvent) + { + this.slowdownEvent = slowdownEvent; + } + + public Boolean getSlowdownEvent() + { + return slowdownEvent; + } + + } + + public static class Collector extends BaseOperator + { + private static Map<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> result = new HashMap<>(); + + public static Map<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> getResult() + { + return result; + } + + public final transient DefaultInputPort<OutputPojo> input = new DefaultInputPort<OutputPojo>() + { + @Override + public void process(OutputPojo tuple) + { + result.put(new KeyValPair<Long, String>(tuple.getTimestamp(), tuple.getKey()), new KeyValPair<Double, Boolean>(tuple.getAvgSpeed(), tuple.getSlowdownEvent())); + } + }; + } + + /** + * Format the results of the slowdown calculations to a OutputPojo. + */ + static class FormatStatsFn implements Function.MapFunction<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>, OutputPojo> + { + @Override + public OutputPojo f(Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>> input) + { + RouteInfo routeInfo = input.getValue().getValue(); + OutputPojo row = new OutputPojo(routeInfo.getAvgSpeed(), routeInfo.getSlowdownEvent(), input.getValue().getKey(), input.getTimestamp()); + return row; + } + } + + + /** + * This composite transformation extracts speed info from traffic station readings. + * It groups the readings by 'route' and analyzes traffic slowdown for that route. + * Lastly, it formats the results for JDBC. + */ + static class TrackSpeed extends + CompositeStreamTransform<WindowedStream<KeyValPair<String, StationSpeed>>, WindowedStream<OutputPojo>> + { + @Override + public WindowedStream<OutputPojo> compose(WindowedStream<KeyValPair<String, StationSpeed>> inputStream) + { + // Apply a GroupByKey transform to collect a list of all station + // readings for a given route. + WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<StationSpeed>>>> timeGroup = + inputStream + .accumulateByKey(new Group<StationSpeed>(), new Function.ToKeyValue<KeyValPair<String, StationSpeed>, String, StationSpeed>() + { + @Override + public Tuple<KeyValPair<String, StationSpeed>> f(KeyValPair<String, StationSpeed> input) + { + return new Tuple.TimestampedTuple<>(input.getValue().getTimestamp(), input); + } + }, name("GroupByKey")); + + // Analyze 'slowdown' over the route readings. + WindowedStream<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> stats = timeGroup + .flatMap(new GatherStats(), name("GatherStats")); + + // Format the results for writing to JDBC table. + WindowedStream<OutputPojo> results = stats.map(new FormatStatsFn(), name("FormatStatsFn")); + + return results; + } + } + + + private static Double tryParseAvgSpeed(String[] inputItems) + { + try { + return Double.parseDouble(tryParseString(inputItems, 3)); + } catch (NumberFormatException e) { + return null; + } catch (NullPointerException e) { + return null; + } + } + + private static String tryParseStationType(String[] inputItems) + { + return tryParseString(inputItems, 2); + } + + private static String tryParseStationId(String[] inputItems) + { + return tryParseString(inputItems, 1); + } + + private static String tryParseTimestamp(String[] inputItems) + { + return tryParseString(inputItems, 0); + } + + private static String tryParseString(String[] inputItems, int index) + { + return inputItems.length >= index ? inputItems[index] : null; + } + + /** + * Define some small hard-wired San Diego 'routes' to track based on sensor station ID. + */ + private static Map<String, String> buildStationInfo() + { + Map<String, String> stations = new Hashtable<String, String>(); + stations.put("1108413", "SDRoute1"); // from freeway 805 S + stations.put("1108699", "SDRoute2"); // from freeway 78 E + stations.put("1108702", "SDRoute2"); + return stations; + } + + /** + * A dummy generator to generate some traffic information. + */ + public static class InfoGen extends BaseOperator implements InputOperator + { + public transient DefaultOutputPort<String> output = new DefaultOutputPort<>(); + + private String[] stationTypes = new String[]{"ML", "BL", "GL"}; + private int[] stationIDs = new int[]{1108413, 1108699, 1108702}; + private double ave = 55.0; + private long timestamp; + private static final Duration RAND_RANGE = Duration.standardMinutes(10); + private static int tupleCount = 0; + + public static int getTupleCount() + { + return tupleCount; + } + + @Override + public void setup(Context.OperatorContext context) + { + tupleCount = 0; + timestamp = System.currentTimeMillis(); + } + + @Override + public void emitTuples() + { + for (String stationType : stationTypes) { + for (int stationID : stationIDs) { + double speed = Math.random() * 20 + ave; + long time = (long)(Math.random() * RAND_RANGE.getMillis()) + timestamp; + try { + output.emit(time + "," + stationID + "," + stationType + "," + speed); + tupleCount++; + + Thread.sleep(50); + } catch (Exception e) { + // Ignore it + } + } + } + } + } + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + InfoGen infoGen = new InfoGen(); + Collector collector = new Collector(); + + // Create a stream from the input operator. + ApexStream<Tuple.TimestampedTuple<String>> stream = StreamFactory.fromInput(infoGen, infoGen.output, name("infoGen")) + + // Extract the timestamp from the input and wrap it into a TimestampedTuple. + .map(new ExtractTimestamps(), name("ExtractTimestamps")); + + stream + // Extract the average speed of a station. + .flatMap(new ExtractStationSpeedFn(), name("ExtractStationSpeedFn")) + + // Apply window and trigger option. + .window(new WindowOption.SlidingTimeWindows(Duration.standardMinutes(WINDOW_DURATION), Duration.standardMinutes(WINDOW_SLIDE_EVERY)), new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(5000)).accumulatingFiredPanes()) + + // Apply TrackSpeed composite transformation to compute the route information. + .addCompositeStreams(new TrackSpeed()) + + // print the result to console. + .print(name("console")) + .endWith(collector, collector.input, name("Collector")) + .populateDag(dag); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java ---------------------------------------------------------------------- diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java new file mode 100644 index 0000000..6332c66 --- /dev/null +++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.complete; + +import java.nio.charset.Charset; +import java.nio.charset.CharsetEncoder; + +import java.util.LinkedList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.joda.time.Duration; + +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.apex.malhar.lib.window.WindowOption; + +import org.apache.apex.malhar.stream.api.ApexStream; +import org.apache.apex.malhar.stream.api.CompositeStreamTransform; +import org.apache.apex.malhar.stream.api.WindowedStream; +import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.StreamFactory; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.contrib.twitter.TwitterSampleInput; +import com.datatorrent.lib.util.KeyValPair; + +import static org.apache.apex.malhar.stream.api.Option.Options.name; + +/** + * Auto Complete Hashtag Example with real time twitter input. In order to run this application, you need to create an app + * at https://apps.twitter.com, then generate your consumer and access keys and tokens, and enter those information + * accordingly in /resources/META-INF/properties.xml. + * + * The authentication requires following 4 information. + * Your application consumer key, + * Your application consumer secret, + * Your twitter access token, and + * Your twitter access token secret. + * + * @since 3.5.0 + */ +@ApplicationAnnotation(name = "TwitterAutoComplete") +public class TwitterAutoComplete implements StreamingApplication +{ + /** + * Check whether every character in a string is ASCII encoding. + */ + public static class StringUtils + { + static CharsetEncoder encoder = Charset.forName("US-ASCII").newEncoder(); + + public static boolean isAscii(String v) + { + return encoder.canEncode(v); + } + } + + /** + * FlapMap Function to extract all hashtags from a text form tweet. + */ + private static class ExtractHashtags implements Function.FlatMapFunction<String, String> + { + + @Override + public Iterable<String> f(String input) + { + List<String> result = new LinkedList<>(); + Matcher m = Pattern.compile("#\\S+").matcher(input); + while (m.find()) { + result.add(m.group().substring(1)); + } + return result; + } + } + + /** + * Lower latency, but more expensive. + */ + private static class ComputeTopFlat + extends CompositeStreamTransform<WindowedStream<CompletionCandidate>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>> + { + private final int candidatesPerPrefix; + private final int minPrefix; + + public ComputeTopFlat(int candidatesPerPrefix, int minPrefix) + { + this.candidatesPerPrefix = candidatesPerPrefix; + this.minPrefix = minPrefix; + } + + @Override + public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose( + WindowedStream<CompletionCandidate> input) + { + TopNByKey topNByKey = new TopNByKey(); + topNByKey.setN(candidatesPerPrefix); + return input + .<KeyValPair<String, CompletionCandidate>, WindowedStream<KeyValPair<String, CompletionCandidate>>>flatMap(new AllPrefixes(minPrefix, 3), name("Extract Prefixes")) + .accumulateByKey(topNByKey, new Function.ToKeyValue<KeyValPair<String, CompletionCandidate>, String, CompletionCandidate>() + { + @Override + public Tuple<KeyValPair<String, CompletionCandidate>> f(KeyValPair<String, CompletionCandidate> tuple) + { + // TODO: Should be removed after Auto-wrapping is supported. + return new Tuple.WindowedTuple<>(Window.GlobalWindow.INSTANCE, tuple); + } + }, name("TopNByKey")); + } + } + + /** + * FlapMap Function to extract all prefixes of the hashtag in the input CompletionCandidate, and output + * KeyValPairs of the prefix and the CompletionCandidate + */ + private static class AllPrefixes implements Function.FlatMapFunction<CompletionCandidate, KeyValPair<String, CompletionCandidate>> + { + private final int minPrefix; + private final int maxPrefix; + + public AllPrefixes() + { + this(0, Integer.MAX_VALUE); + } + + public AllPrefixes(int minPrefix) + { + this(minPrefix, Integer.MAX_VALUE); + } + + public AllPrefixes(int minPrefix, int maxPrefix) + { + this.minPrefix = minPrefix; + this.maxPrefix = maxPrefix; + } + + @Override + public Iterable<KeyValPair<String, CompletionCandidate>> f(CompletionCandidate input) + { + List<KeyValPair<String, CompletionCandidate>> result = new LinkedList<>(); + String word = input.getValue(); + for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) { + result.add(new KeyValPair<>(input.getValue().substring(0, i).toLowerCase(), input)); + } + return result; + } + } + + /** + * A Composite stream transform that takes as input a list of tokens and returns + * the most common tokens per prefix. + */ + public static class ComputeTopCompletions + extends CompositeStreamTransform<WindowedStream<String>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>> + { + private final int candidatesPerPrefix; + private final boolean recursive; + + protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive) + { + this.candidatesPerPrefix = candidatesPerPrefix; + this.recursive = recursive; + } + + public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive) + { + return new ComputeTopCompletions(candidatesPerPrefix, recursive); + } + + @Override + @SuppressWarnings("unchecked") + public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(WindowedStream<String> inputStream) + { + + ApexStream<CompletionCandidate> candidates = inputStream + .countByKey(new Function.ToKeyValue<String, String, Long>() + { + @Override + public Tuple<KeyValPair<String, Long>> f(String input) + { + return new Tuple.PlainTuple<>(new KeyValPair<>(input, 1L)); + } + }, name("Hashtag Count")) + .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String,Long>>, CompletionCandidate>() + { + @Override + public CompletionCandidate f(Tuple.WindowedTuple<KeyValPair<String, Long>> input) + { + return new CompletionCandidate(input.getValue().getKey(), input.getValue().getValue()); + } + }, name("KeyValPair to CompletionCandidate")); + + return candidates.addCompositeStreams(new ComputeTopFlat(candidatesPerPrefix, 1)); + + } + } + + /** + * FilterFunction to filter out tweets with non-acsii characters. + */ + static class ASCIIFilter implements Function.FilterFunction<String> + { + @Override + public boolean f(String input) + { + return StringUtils.isAscii(input); + } + } + + /** + * Populate the dag with High-Level API. + * @param dag + * @param conf + */ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + TwitterSampleInput input = new TwitterSampleInput(); + + WindowOption windowOption = new WindowOption.GlobalWindow(); + + ApexStream<String> tags = StreamFactory.fromInput(input, input.text, name("tweetSampler")) + .filter(new ASCIIFilter(), name("ACSII Filter")) + .flatMap(new ExtractHashtags(), name("Extract Hashtags")); + + ApexStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> s = + tags.window(windowOption, new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(10))) + .addCompositeStreams(ComputeTopCompletions.top(10, true)).print(); + + s.populateDag(dag); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java ---------------------------------------------------------------------- diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java new file mode 100644 index 0000000..bfdb268 --- /dev/null +++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java @@ -0,0 +1,285 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.cookbook; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.lib.window.accumulation.ReduceFn; +import org.apache.apex.malhar.stream.api.ApexStream; +import org.apache.apex.malhar.stream.api.CompositeStreamTransform; +import org.apache.apex.malhar.stream.api.WindowedStream; +import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.StreamFactory; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.util.KeyValPair; + +import static org.apache.apex.malhar.stream.api.Option.Options.name; + +/** + * An example that reads the public 'Shakespeare' data, and for each word in + * the dataset that is over a given length, generates a string containing the + * list of play names in which that word appears + * + * <p>Concepts: the combine transform, which lets you combine the values in a + * key-grouped Collection + * + * + * @since 3.5.0 + */ +@ApplicationAnnotation(name = "CombinePerKeyExamples") +public class CombinePerKeyExamples implements StreamingApplication +{ + // Use the shakespeare public BigQuery sample + private static final String SHAKESPEARE_TABLE = "publicdata:samples.shakespeare"; + // We'll track words >= this word length across all plays in the table. + private static final int MIN_WORD_LENGTH = 0; + + /** + * Examines each row in the input table. If the word is greater than or equal to MIN_WORD_LENGTH, + * outputs word, play_name. + */ + static class ExtractLargeWordsFn implements Function.MapFunction<SampleBean, KeyValPair<String, String>> + { + + @Override + public KeyValPair<String, String> f(SampleBean input) + { + String playName = input.getCorpus(); + String word = input.getWord(); + if (word.length() >= MIN_WORD_LENGTH) { + return new KeyValPair<>(word, playName); + } else { + return null; + } + } + } + + + /** + * Prepares the output data which is in same bean + */ + static class FormatShakespeareOutputFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, String>>, SampleBean> + { + @Override + public SampleBean f(Tuple.WindowedTuple<KeyValPair<String, String>> input) + { + return new SampleBean(input.getValue().getKey(), input.getValue().getValue()); + } + } + + /** + * A reduce function to concat two strings together. + */ + public static class Concat extends ReduceFn<String> + { + @Override + public String reduce(String input1, String input2) + { + return input1 + ", " + input2; + } + } + + /** + * Reads the public 'Shakespeare' data, and for each word in the dataset + * over a given length, generates a string containing the list of play names + * in which that word appears. + */ + private static class PlaysForWord extends CompositeStreamTransform<ApexStream<SampleBean>, WindowedStream<SampleBean>> + { + + @Override + public WindowedStream<SampleBean> compose(ApexStream<SampleBean> inputStream) + { + return inputStream + // Extract words from the input SampleBeam stream. + .map(new ExtractLargeWordsFn(), name("ExtractLargeWordsFn")) + + // Apply window and trigger option to the streams. + .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1)) + + // Apply reduceByKey transformation to concat the names of all the plays that a word has appeared in together. + .reduceByKey(new Concat(), new Function.ToKeyValue<KeyValPair<String,String>, String, String>() + { + @Override + public Tuple<KeyValPair<String, String>> f(KeyValPair<String, String> input) + { + return new Tuple.PlainTuple<KeyValPair<String, String>>(input); + } + }, name("Concat")) + + // Format the output back to a SampleBeam object. + .map(new FormatShakespeareOutputFn(), name("FormatShakespeareOutputFn")); + } + } + + + /** + * A Java Beam class that contains information about a word appears in a corpus written by Shakespeare. + */ + public static class SampleBean + { + + public SampleBean() + { + + } + + public SampleBean(String word, String corpus) + { + this.word = word; + this.corpus = corpus; + } + + @Override + public String toString() + { + return this.word + " : " + this.corpus; + } + + private String word; + + private String corpus; + + public void setWord(String word) + { + this.word = word; + } + + public String getWord() + { + return word; + } + + public void setCorpus(String corpus) + { + this.corpus = corpus; + } + + public String getCorpus() + { + return corpus; + } + } + + /** + * A dummy info generator to generate {@link SampleBean} objects to mimic reading from real 'Shakespeare' + * data. + */ + public static class SampleInput extends BaseOperator implements InputOperator + { + + public final transient DefaultOutputPort<SampleBean> beanOutput = new DefaultOutputPort(); + private String[] words = new String[]{"A", "B", "C", "D", "E", "F", "G"}; + private String[] corpuses = new String[]{"1", "2", "3", "4", "5", "6", "7", "8"}; + private static int i; + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + i = 0; + } + + @Override + public void emitTuples() + { + while (i < 1) { + for (String word : words) { + for (String corpus : corpuses) { + try { + Thread.sleep(50); + beanOutput.emit(new SampleBean(word, corpus)); + } catch (Exception e) { + // Ignore it + } + } + } + i++; + } + + } + } + + public static class Collector extends BaseOperator + { + private static List<SampleBean> result; + private static boolean done = false; + + public static List<SampleBean> getResult() + { + return result; + } + + public static boolean isDone() + { + return done; + } + + @Override + public void setup(Context.OperatorContext context) + { + result = new ArrayList<>(); + done = false; + } + + public final transient DefaultInputPort<SampleBean> input = new DefaultInputPort<SampleBean>() + { + @Override + public void process(SampleBean tuple) + { + if (tuple.getWord().equals("F")) { + done = true; + } + result.add(tuple); + } + }; + } + + /** + * Populate dag using High-Level API. + * @param dag + * @param conf + */ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + SampleInput input = new SampleInput(); + Collector collector = new Collector(); + StreamFactory.fromInput(input, input.beanOutput, name("input")) + .addCompositeStreams(new PlaysForWord()) + .print(name("console")) + .endWith(collector, collector.input, name("Collector")) + .populateDag(dag); + + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java ---------------------------------------------------------------------- diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java new file mode 100644 index 0000000..4df5fe7 --- /dev/null +++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.cookbook; + +import java.util.Arrays; +import java.util.List; + +import org.joda.time.Duration; + +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.lib.window.accumulation.RemoveDuplicates; +import org.apache.apex.malhar.stream.api.ApexStream; +import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.StreamFactory; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.common.util.BaseOperator; + +import static org.apache.apex.malhar.stream.api.Option.Options.name; + +/** + * Beam DeDupExample. + * + * @since 3.5.0 + */ +@ApplicationAnnotation(name = "DeDupExample") +public class DeDupExample implements StreamingApplication +{ + + public static class Collector extends BaseOperator + { + private static Tuple.WindowedTuple<List<String>> result; + private static boolean done = false; + + public static Tuple.WindowedTuple<List<String>> getResult() + { + return result; + } + + public static boolean isDone() + { + return done; + } + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + result = new Tuple.WindowedTuple<>(); + done = false; + } + + public transient DefaultInputPort<Tuple.WindowedTuple<List<String>>> input = new DefaultInputPort<Tuple.WindowedTuple<List<String>>>() + { + @Override + public void process(Tuple.WindowedTuple<List<String>> tuple) + { + result = tuple; + if (result.getValue().contains("bye")) { + done = true; + } + } + }; + } + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + Collector collector = new Collector(); + + // Create a stream that reads from files in a local folder and output lines one by one to downstream. + ApexStream<String> stream = StreamFactory.fromFolder("./src/test/resources/wordcount", name("textInput")) + + // Extract all the words from the input line of text. + .flatMap(new Function.FlatMapFunction<String, String>() + { + @Override + public Iterable<String> f(String input) + { + return Arrays.asList(input.split("[\\p{Punct}\\s]+")); + } + }, name("ExtractWords")) + + // Change the words to lower case, also shutdown the app when the word "bye" is detected. + .map(new Function.MapFunction<String, String>() + { + @Override + public String f(String input) + { + return input.toLowerCase(); + } + }, name("ToLowerCase")); + + // Apply window and trigger option. + stream.window(new WindowOption.GlobalWindow(), + new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(1))) + + // Remove the duplicate words and print out the result. + .accumulate(new RemoveDuplicates<String>(), name("RemoveDuplicates")) + .print(name("console")) + .endWith(collector, collector.input) + .populateDag(dag); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java ---------------------------------------------------------------------- diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java new file mode 100644 index 0000000..834964c --- /dev/null +++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.cookbook; + +/** + * Tuple class for JDBC input of {@link MaxPerKeyExamples}. + * + * @since 3.5.0 + */ +public class InputPojo extends Object +{ + private int month; + private int day; + private int year; + private double meanTemp; + + @Override + public String toString() + { + return "PojoEvent [month=" + getMonth() + ", day=" + getDay() + ", year=" + getYear() + ", meanTemp=" + getMeanTemp() + "]"; + } + + public void setMonth(int month) + { + this.month = month; + } + + public int getMonth() + { + return this.month; + } + + public void setDay(int day) + { + this.day = day; + } + + public int getDay() + { + return day; + } + + public void setYear(int year) + { + this.year = year; + } + + public int getYear() + { + return year; + } + + public void setMeanTemp(double meanTemp) + { + this.meanTemp = meanTemp; + } + + public double getMeanTemp() + { + return meanTemp; + } +}
