http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java index 5d4c628..ecd71ae 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java @@ -18,19 +18,33 @@ */ package org.apache.apex.malhar.stream.sample.cookbook; +import java.util.ArrayList; +import java.util.List; + +import org.apache.apex.malhar.lib.window.TriggerOption; import org.apache.apex.malhar.lib.window.Tuple; import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.lib.window.impl.accumulation.ReduceFn; import org.apache.apex.malhar.stream.api.ApexStream; import org.apache.apex.malhar.stream.api.CompositeStreamTransform; +import org.apache.apex.malhar.stream.api.WindowedStream; import org.apache.apex.malhar.stream.api.function.Function; import org.apache.apex.malhar.stream.api.impl.StreamFactory; -import org.apache.apex.malhar.stream.api.impl.accumulation.ReduceFn; + +import org.apache.hadoop.conf.Configuration; import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.InputOperator; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.common.util.BaseOperator; import com.datatorrent.lib.util.KeyValPair; +import static org.apache.apex.malhar.stream.api.Option.Options.name; + /** * An example that reads the public 'Shakespeare' data, and for each word in * the dataset that is over a given length, generates a string containing the @@ -40,12 +54,13 @@ import com.datatorrent.lib.util.KeyValPair; * key-grouped Collection * */ -public class CombinePerKeyExamples +@ApplicationAnnotation(name = "CombinePerKeyExamples") +public class CombinePerKeyExamples implements StreamingApplication { // Use the shakespeare public BigQuery sample private static final String SHAKESPEARE_TABLE = "publicdata:samples.shakespeare"; // We'll track words >= this word length across all plays in the table. - private static final int MIN_WORD_LENGTH = 9; + private static final int MIN_WORD_LENGTH = 0; /** * Examines each row in the input table. If the word is greater than or equal to MIN_WORD_LENGTH, @@ -76,70 +91,59 @@ public class CombinePerKeyExamples @Override public SampleBean f(Tuple.WindowedTuple<KeyValPair<String, String>> input) { - return new SampleBean(input.getValue().getKey(), input.getValue().getValue(), null); + return new SampleBean(input.getValue().getKey(), input.getValue().getValue()); } } - + + /** + * A reduce function to concat two strings together. + */ + public static class Concat extends ReduceFn<String> + { + @Override + public String reduce(String input1, String input2) + { + return input1 + ", " + input2; + } + } + /** * Reads the public 'Shakespeare' data, and for each word in the dataset * over a given length, generates a string containing the list of play names * in which that word appears. */ - static class PlaysForWord - extends CompositeStreamTransform<SampleBean, SampleBean> + private static class PlaysForWord extends CompositeStreamTransform<ApexStream<SampleBean>, WindowedStream<SampleBean>> { - + @Override - public ApexStream<SampleBean> compose(ApexStream<SampleBean> inputStream) + public WindowedStream<SampleBean> compose(ApexStream<SampleBean> inputStream) { - // fix this later - return inputStream.map(new ExtractLargeWordsFn()) - .window(new WindowOption.GlobalWindow()) - .reduceByKey(new ReduceFn<String>() - { - @Override - public String defaultAccumulatedValue() - { - return ""; - } - - @Override - public String accumulate(String accumulatedValue, String input) - { - return accumulatedValue + "," + input; - } - - @Override - public String merge(String accumulatedValue1, String accumulatedValue2) - { - return accumulatedValue1 + "," + accumulatedValue2; - } - - @Override - public String getOutput(String accumulatedValue) - { - return accumulatedValue; - } - - @Override - public String getRetraction(String value) - { - return value; - } - }, new Function.MapFunction<KeyValPair<String, String>, Tuple<KeyValPair<String, String>>>() - + return inputStream + // Extract words from the input SampleBeam stream. + .map(new ExtractLargeWordsFn(), name("ExtractLargeWordsFn")) + + // Apply window and trigger option to the streams. + .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1)) + + // Apply reduceByKey transformation to concat the names of all the plays that a word has appeared in together. + .reduceByKey(new Concat(), new Function.ToKeyValue<KeyValPair<String,String>, String, String>() { @Override public Tuple<KeyValPair<String, String>> f(KeyValPair<String, String> input) { - return null; + return new Tuple.PlainTuple<KeyValPair<String, String>>(input); } - }) - .map(new FormatShakespeareOutputFn()); + }, name("Concat")) + + // Format the output back to a SampleBeam object. + .map(new FormatShakespeareOutputFn(), name("FormatShakespeareOutputFn")); } } - - + + + /** + * A Java Beam class that contains information about a word appears in a corpus written by Shakespeare. + */ public static class SampleBean { @@ -148,17 +152,20 @@ public class CombinePerKeyExamples } - public SampleBean(String word, String all_plays, String corpus) + public SampleBean(String word, String corpus) { this.word = word; - this.all_plays = all_plays; this.corpus = corpus; } - + + @Override + public String toString() + { + return this.word + " : " + this.corpus; + } + private String word; - private String all_plays; - private String corpus; public void setWord(String word) @@ -180,58 +187,87 @@ public class CombinePerKeyExamples { return corpus; } - - public void setAll_plays(String all_plays) - { - this.all_plays = all_plays; - } - - public String getAll_plays() - { - return all_plays; - } } - - public static class SampleInput implements InputOperator + + /** + * A dummy info generator to generate {@link SampleBean} objects to mimic reading from real 'Shakespeare' + * data. + */ + public static class SampleInput extends BaseOperator implements InputOperator { public final transient DefaultOutputPort<SampleBean> beanOutput = new DefaultOutputPort(); - - @Override - public void emitTuples() + private String[] words = new String[]{"A", "B", "C", "D", "E", "F", "G"}; + private String[] corpuses = new String[]{"1", "2", "3", "4", "5", "6", "7", "8"}; + private static int i; + + public static int getI() { - + return i; } - + @Override - public void beginWindow(long l) + public void setup(Context.OperatorContext context) { - + super.setup(context); + i = 0; } - + @Override - public void endWindow() + public void emitTuples() { - + while (i < 1) { + for (String word : words) { + for (String corpus : corpuses) { + beanOutput.emit(new SampleBean(word, corpus)); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // Ignore it + } + } + } + i++; + } + } - + } + + public static class Collector extends BaseOperator + { + static List<SampleBean> result; + @Override public void setup(Context.OperatorContext context) { - + result = new ArrayList<>(); } - - @Override - public void teardown() + + public final transient DefaultInputPort<SampleBean> input = new DefaultInputPort<SampleBean>() { - - } + @Override + public void process(SampleBean tuple) + { + result.add(tuple); + } + }; } - - - public static void main(String[] args) throws Exception + + /** + * Populate dag using High-Level API. + * @param dag + * @param conf + */ + @Override + public void populateDAG(DAG dag, Configuration conf) { SampleInput input = new SampleInput(); - StreamFactory.fromInput(input, input.beanOutput).addCompositeStreams(new PlaysForWord()); + Collector collector = new Collector(); + StreamFactory.fromInput(input, input.beanOutput, name("input")) + .addCompositeStreams(new PlaysForWord()) + .print() + .endWith(collector, collector.input, name("Collector")) + .populateDag(dag); + } }
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java new file mode 100644 index 0000000..53426f3 --- /dev/null +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.cookbook; + +import java.util.Arrays; +import java.util.List; + +import org.joda.time.Duration; + +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.lib.window.impl.accumulation.RemoveDuplicates; +import org.apache.apex.malhar.stream.api.ApexStream; +import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.StreamFactory; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.common.util.BaseOperator; + +import static org.apache.apex.malhar.stream.api.Option.Options.name; + +/** + * Beam DeDupExample. + */ +@ApplicationAnnotation(name = "DeDupExample") +public class DeDupExample implements StreamingApplication +{ + + public static class Collector extends BaseOperator + { + private static Tuple.WindowedTuple<List<String>> result; + private static boolean done = false; + + public static Tuple.WindowedTuple<List<String>> getResult() + { + return result; + } + + public static boolean isDone() + { + return done; + } + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + result = new Tuple.WindowedTuple<>(); + done = false; + } + + public transient DefaultInputPort<Tuple.WindowedTuple<List<String>>> input = new DefaultInputPort<Tuple.WindowedTuple<List<String>>>() + { + @Override + public void process(Tuple.WindowedTuple<List<String>> tuple) + { + result = tuple; + if (result.getValue().contains("bye")) { + done = true; + } + } + }; + } + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + Collector collector = new Collector(); + + // Create a stream that reads from files in a local folder and output lines one by one to downstream. + ApexStream<String> stream = StreamFactory.fromFolder("./src/test/resources/wordcount", name("textInput")) + + // Extract all the words from the input line of text. + .flatMap(new Function.FlatMapFunction<String, String>() + { + @Override + public Iterable<String> f(String input) + { + return Arrays.asList(input.split("[\\p{Punct}\\s]+")); + } + }, name("ExtractWords")) + + // Change the words to lower case, also shutdown the app when the word "bye" is detected. + .map(new Function.MapFunction<String, String>() + { + @Override + public String f(String input) + { + return input.toLowerCase(); + } + }, name("ToLowerCase")); + + // Apply window and trigger option. + stream.window(new WindowOption.GlobalWindow(), + new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(1))) + + // Remove the duplicate words and print out the result. + .accumulate(new RemoveDuplicates<String>(), name("RemoveDuplicates")).print().endWith(collector, collector.input) + + .populateDag(dag); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java new file mode 100644 index 0000000..3643eab --- /dev/null +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.cookbook; + +/** + * Tuple class for JDBC input of {@link MaxPerKeyExamples}. + */ +public class InputPojo extends Object +{ + private int month; + private int day; + private int year; + private double meanTemp; + + @Override + public String toString() + { + return "PojoEvent [month=" + getMonth() + ", day=" + getDay() + ", year=" + getYear() + ", meanTemp=" + getMeanTemp() + "]"; + } + + public void setMonth(int month) + { + this.month = month; + } + + public int getMonth() + { + return this.month; + } + + public void setDay(int day) + { + this.day = day; + } + + public int getDay() + { + return day; + } + + public void setYear(int year) + { + this.year = year; + } + + public int getYear() + { + return year; + } + + public void setMeanTemp(double meanTemp) + { + this.meanTemp = meanTemp; + } + + public double getMeanTemp() + { + return meanTemp; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java new file mode 100644 index 0000000..97b2696 --- /dev/null +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.cookbook; + +import java.util.List; + +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.lib.window.impl.accumulation.Max; +import org.apache.apex.malhar.stream.api.ApexStream; +import org.apache.apex.malhar.stream.api.CompositeStreamTransform; +import org.apache.apex.malhar.stream.api.WindowedStream; +import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.StreamFactory; +import org.apache.hadoop.conf.Configuration; + +import static java.sql.Types.DOUBLE; +import static java.sql.Types.INTEGER; + +import com.google.common.collect.Lists; +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.db.jdbc.JdbcFieldInfo; +import com.datatorrent.lib.db.jdbc.JdbcPOJOInputOperator; +import com.datatorrent.lib.db.jdbc.JdbcPOJOInsertOutputOperator; +import com.datatorrent.lib.db.jdbc.JdbcStore; +import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore; +import com.datatorrent.lib.util.FieldInfo; +import com.datatorrent.lib.util.KeyValPair; + +import static org.apache.apex.malhar.stream.api.Option.Options.name; + +/** + * MaxPerKeyExamples Application from Beam + */ +@ApplicationAnnotation(name = "MaxPerKeyExamples") +public class MaxPerKeyExamples implements StreamingApplication +{ + + /** + * A map function to extract the mean temperature from {@link InputPojo}. + */ + public static class ExtractTempFn implements Function.MapFunction<InputPojo, KeyValPair<Integer, Double>> + { + @Override + public KeyValPair<Integer, Double> f(InputPojo row) + { + Integer month = row.getMonth(); + Double meanTemp = row.getMeanTemp(); + return new KeyValPair<Integer, Double>(month, meanTemp); + } + } + + + /** + * A map function to format output to {@link OutputPojo}. + */ + public static class FormatMaxesFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<Integer, Double>>, OutputPojo> + { + @Override + public OutputPojo f(Tuple.WindowedTuple<KeyValPair<Integer, Double>> input) + { + OutputPojo row = new OutputPojo(); + row.setMonth(input.getValue().getKey()); + row.setMeanTemp(input.getValue().getValue()); + return row; + } + } + + /** + * A composite transformation to perform three tasks: + * 1. extract the month and its mean temperature from input pojo. + * 2. find the maximum mean temperature for every month. + * 3. format the result to a output pojo object. + */ + public static class MaxMeanTemp extends CompositeStreamTransform<WindowedStream<InputPojo>, WindowedStream<OutputPojo>> + { + @Override + public WindowedStream<OutputPojo> compose(WindowedStream<InputPojo> rows) + { + // InputPojo... => <month, meanTemp> ... + WindowedStream<KeyValPair<Integer, Double>> temps = rows.map(new ExtractTempFn(), name("ExtractTempFn")); + + // month, meanTemp... => <month, max mean temp>... + WindowedStream<Tuple.WindowedTuple<KeyValPair<Integer, Double>>> tempMaxes = + temps.accumulateByKey(new Max<Double>(), + new Function.ToKeyValue<KeyValPair<Integer, Double>, Integer, Double>() + { + @Override + public Tuple<KeyValPair<Integer, Double>> f(KeyValPair<Integer, Double> input) + { + return new Tuple.WindowedTuple<KeyValPair<Integer, Double>>(Window.GLOBAL_WINDOW, input); + } + }, name("MaxPerMonth")); + + // <month, max>... => OutputPojo... + WindowedStream<OutputPojo> results = tempMaxes.map(new FormatMaxesFn(), name("FormatMaxesFn")); + + return results; + } + } + + /** + * Method to set field info for {@link JdbcPOJOInputOperator}. + * @return + */ + private List<FieldInfo> addInputFieldInfos() + { + List<FieldInfo> fieldInfos = Lists.newArrayList(); + fieldInfos.add(new FieldInfo("MONTH", "month", FieldInfo.SupportType.INTEGER)); + fieldInfos.add(new FieldInfo("DAY", "day", FieldInfo.SupportType.INTEGER)); + fieldInfos.add(new FieldInfo("YEAR", "year", FieldInfo.SupportType.INTEGER)); + fieldInfos.add(new FieldInfo("MEANTEMP", "meanTemp", FieldInfo.SupportType.DOUBLE)); + return fieldInfos; + } + + /** + * Method to set field info for {@link JdbcPOJOInsertOutputOperator}. + * @return + */ + private List<JdbcFieldInfo> addOutputFieldInfos() + { + List<JdbcFieldInfo> fieldInfos = Lists.newArrayList(); + fieldInfos.add(new JdbcFieldInfo("MONTH", "month", JdbcFieldInfo.SupportType.INTEGER, INTEGER)); + fieldInfos.add(new JdbcFieldInfo("MEANTEMP", "meanTemp", JdbcFieldInfo.SupportType.DOUBLE, DOUBLE)); + return fieldInfos; + } + + + /** + * Populate the dag using High-Level API. + * @param dag + * @param conf + */ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + JdbcPOJOInputOperator jdbcInput = new JdbcPOJOInputOperator(); + jdbcInput.setFieldInfos(addInputFieldInfos()); + + JdbcStore store = new JdbcStore(); + jdbcInput.setStore(store); + + JdbcPOJOInsertOutputOperator jdbcOutput = new JdbcPOJOInsertOutputOperator(); + jdbcOutput.setFieldInfos(addOutputFieldInfos()); + JdbcTransactionalStore outputStore = new JdbcTransactionalStore(); + jdbcOutput.setStore(outputStore); + + // Create stream that reads from a Jdbc Input. + ApexStream<Object> stream = StreamFactory.fromInput(jdbcInput, jdbcInput.outputPort, name("jdbcInput")) + + // Apply window and trigger option to the stream. + .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1)) + + // Because Jdbc Input sends out a stream of Object, need to cast them to InputPojo. + .map(new Function.MapFunction<Object, InputPojo>() + { + @Override + public InputPojo f(Object input) + { + return (InputPojo)input; + } + }, name("ObjectToInputPojo")) + + // Plug in the composite transformation to the stream to calculate the maximum temperature for each month. + .addCompositeStreams(new MaxMeanTemp()) + + // Cast the resulted OutputPojo to Object for Jdbc Output to consume. + .map(new Function.MapFunction<OutputPojo, Object>() + { + @Override + public Object f(OutputPojo input) + { + return (Object)input; + } + }, name("OutputPojoToObject")) + + // Output the result to Jdbc Output. + .endWith(jdbcOutput, jdbcOutput.input, name("jdbcOutput")); + + stream.populateDag(dag); + + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java new file mode 100644 index 0000000..db2a09e --- /dev/null +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.cookbook; + +/** + * OutputPojo Tuple Class for jdbcOutput of {@link MaxPerKeyExamples}. + */ +public class OutputPojo +{ + private int month; + private double meanTemp; + + @Override + public String toString() + { + return "PojoEvent [month=" + getMonth() + ", meanTemp=" + getMeanTemp() + "]"; + } + + public void setMonth(int month) + { + this.month = month; + } + + public int getMonth() + { + return this.month; + } + + public void setMeanTemp(double meanTemp) + { + this.meanTemp = meanTemp; + } + + public double getMeanTemp() + { + return meanTemp; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java index 903f624..bf23e3a 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java @@ -24,6 +24,7 @@ import java.util.Objects; import org.joda.time.Duration; import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; import org.apache.apex.malhar.lib.window.WindowOption; import org.apache.apex.malhar.stream.api.ApexStream; import org.apache.apex.malhar.stream.api.CompositeStreamTransform; @@ -85,31 +86,31 @@ import com.datatorrent.lib.util.KeyValPair; * Note: When you start up your pipeline, you'll initially see results from 'late' data. Wait after * the window duration, until the first pane of non-late data has been emitted, to see more * interesting results. - * {@code SELECT * FROM enter_table_name WHERE trigger_type = "default" ORDER BY window DESC} + * {@code SELECT * FROM enter_table_name WHERE triggerType = "default" ORDER BY window DESC} * * <p> To see the late data i.e. dropped by the default trigger, - * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "withAllowedLateness" and - * (timing = "LATE" or timing = "ON_TIME") and freeway = "5" ORDER BY window DESC, processing_time} + * {@code SELECT * FROM <enter_table_name> WHERE triggerType = "withAllowedLateness" and + * (timing = "LATE" or timing = "ON_TIME") and freeway = "5" ORDER BY window DESC, processingTime} * * <p>To see the the difference between accumulation mode and discarding mode, * {@code SELECT * FROM <enter_table_name> WHERE (timing = "LATE" or timing = "ON_TIME") AND - * (trigger_type = "withAllowedLateness" or trigger_type = "sequential") and freeway = "5" ORDER BY - * window DESC, processing_time} + * (triggerType = "withAllowedLateness" or triggerType = "sequential") and freeway = "5" ORDER BY + * window DESC, processingTime} * * <p> To see speculative results every minute, - * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "speculative" and freeway = "5" - * ORDER BY window DESC, processing_time} + * {@code SELECT * FROM <enter_table_name> WHERE triggerType = "speculative" and freeway = "5" + * ORDER BY window DESC, processingTime} * * <p> To see speculative results every five minutes after the end of the window - * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "sequential" and timing != "EARLY" - * and freeway = "5" ORDER BY window DESC, processing_time} + * {@code SELECT * FROM <enter_table_name> WHERE triggerType = "sequential" and timing != "EARLY" + * and freeway = "5" ORDER BY window DESC, processingTime} * * <p> To see the first and the last pane for a freeway in a window for all the trigger types, * {@code SELECT * FROM <enter_table_name> WHERE (isFirst = true or isLast = true) ORDER BY window} * * <p> To reduce the number of results for each query we can add additional where clauses. * For examples, To see the results of the default trigger, - * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "default" AND freeway = "5" AND + * {@code SELECT * FROM <enter_table_name> WHERE triggerType = "default" AND freeway = "5" AND * window = "<enter_window_interval>"} * * <p> The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C) @@ -135,7 +136,7 @@ public class TriggerExample * The example uses "freeway" as the key. Event time is the timestamp associated with the data * element and processing time is the time when the data element gets processed in the pipeline. * For freeway 5, suppose there are 10 elements in the [10:00:00, 10:30:00) window. - * Key (freeway) | Value (total_flow) | event time | processing time + * Key (freeway) | Value (totalFlow) | event time | processing time * 5 | 50 | 10:00:03 | 10:00:47 * 5 | 30 | 10:01:00 | 10:01:03 * 5 | 30 | 10:02:00 | 11:07:00 @@ -157,7 +158,7 @@ public class TriggerExample * close at 10:44:59, when the watermark passes 10:30:00. */ static class CalculateTotalFlow - extends CompositeStreamTransform<String, SampleBean> + extends CompositeStreamTransform<ApexStream<String>, WindowedStream<SampleBean>> { private int windowDuration; @@ -167,7 +168,7 @@ public class TriggerExample } @Override - public ApexStream<SampleBean> compose(ApexStream<String> inputStream) + public WindowedStream<SampleBean> compose(ApexStream<String> inputStream) { // Concept #1: The default triggering behavior // By default Dataflow uses a trigger which fires when the watermark has passed the end of the @@ -182,14 +183,14 @@ public class TriggerExample // The results for the example above with the default trigger and zero allowed lateness // would be: - // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing + // Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing // 5 | 260 | 6 | true | true | ON_TIME // At 11:03:00 (processing time) the system watermark may have advanced to 10:54:00. As a // result, when the data record with event time 10:05:00 arrives at 11:03:00, it is considered // late, and dropped. - - ApexStream<SampleBean> defaultTriggerResults = inputStream + + WindowedStream<SampleBean> defaultTriggerResults = inputStream .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)), new TriggerOption().discardingFiredPanes()) .addCompositeStreams(new TotalFlow("default")); @@ -205,13 +206,13 @@ public class TriggerExample // The results for the example above with the default trigger and ONE_DAY allowed lateness // would be: - // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing + // Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing // 5 | 260 | 6 | true | false | ON_TIME // 5 | 60 | 1 | false | false | LATE // 5 | 30 | 1 | false | false | LATE // 5 | 20 | 1 | false | false | LATE // 5 | 60 | 1 | false | false | LATE - ApexStream<SampleBean> withAllowedLatenessResults = inputStream + WindowedStream<SampleBean> withAllowedLatenessResults = inputStream .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)), new TriggerOption().discardingFiredPanes(), Duration.standardDays(1)) @@ -226,7 +227,7 @@ public class TriggerExample // We also use accumulatingFiredPanes to build up the results across each pane firing. // The results for the example above for this trigger would be: - // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing + // Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing // 5 | 80 | 2 | true | false | EARLY // 5 | 100 | 3 | false | false | EARLY // 5 | 260 | 6 | false | false | EARLY @@ -258,7 +259,7 @@ public class TriggerExample // Every pane produced will either be EARLY, ON_TIME or LATE. // The results for the example above for this trigger would be: - // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing + // Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing // 5 | 80 | 2 | true | false | EARLY // 5 | 100 | 3 | false | false | EARLY // 5 | 260 | 6 | false | false | EARLY @@ -267,7 +268,7 @@ public class TriggerExample // 5 | 430 | 10 | false | false | LATE // For more possibilities of how to build advanced triggers, see {@link Trigger}. - ApexStream<SampleBean> sequentialResults = inputStream + WindowedStream<SampleBean> sequentialResults = inputStream .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)), // Speculative every ONE_MINUTE new TriggerOption().withEarlyFiringsAtEvery(Duration.standardMinutes(1)) @@ -293,7 +294,7 @@ public class TriggerExample * objects, to save to BigQuery. */ static class TotalFlow extends - CompositeStreamTransform<String, SampleBean> + CompositeStreamTransform<WindowedStream<String>, WindowedStream<SampleBean>> { private String triggerType; @@ -303,13 +304,10 @@ public class TriggerExample } @Override - public ApexStream<SampleBean> compose(ApexStream<String> inputStream) + public WindowedStream<SampleBean> compose(WindowedStream<String> inputStream) { - if (!(inputStream instanceof WindowedStream)) { - throw new RuntimeException("Not supported here"); - } - WindowedStream<String> windowedStream = (WindowedStream<String>)inputStream; - ApexStream<KeyValPair<String, Iterable<Integer>>> flowPerFreeway = windowedStream + + WindowedStream<KeyValPair<String, Iterable<Integer>>> flowPerFreeway = inputStream .groupByKey(new ExtractFlowInfo()); return flowPerFreeway @@ -361,13 +359,13 @@ public class TriggerExample { } - private String trigger_type; + private String triggerType; private String freeway; - private int total_flow; + private int totalFlow; - private long number_of_records; + private long numberOfRecords; private String window; @@ -377,9 +375,9 @@ public class TriggerExample private Date timing; - private Date event_time; + private Date eventTime; - private Date processing_time; + private Date processingTime; @Override public boolean equals(Object o) @@ -391,50 +389,49 @@ public class TriggerExample return false; } SampleBean that = (SampleBean)o; - return total_flow == that.total_flow && - number_of_records == that.number_of_records && + return totalFlow == that.totalFlow && + numberOfRecords == that.numberOfRecords && isFirst == that.isFirst && isLast == that.isLast && - Objects.equals(trigger_type, that.trigger_type) && + Objects.equals(triggerType, that.triggerType) && Objects.equals(freeway, that.freeway) && Objects.equals(window, that.window) && Objects.equals(timing, that.timing) && - Objects.equals(event_time, that.event_time) && - Objects.equals(processing_time, that.processing_time); + Objects.equals(eventTime, that.eventTime) && + Objects.equals(processingTime, that.processingTime); } @Override public int hashCode() { return Objects - .hash(trigger_type, freeway, total_flow, number_of_records, window, isFirst, isLast, timing, event_time, - processing_time); + .hash(triggerType, freeway, totalFlow, numberOfRecords, window, isFirst, isLast, timing, eventTime, + processingTime); } - public SampleBean(String trigger_type, String freeway, int total_flow, long number_of_records, String window, - boolean isFirst, boolean isLast, Date timing, Date event_time, Date processing_time) + public SampleBean(String triggerType, String freeway, int totalFlow, long numberOfRecords, String window, boolean isFirst, boolean isLast, Date timing, Date eventTime, Date processingTime) { - this.trigger_type = trigger_type; + this.triggerType = triggerType; this.freeway = freeway; - this.total_flow = total_flow; - this.number_of_records = number_of_records; + this.totalFlow = totalFlow; + this.numberOfRecords = numberOfRecords; this.window = window; this.isFirst = isFirst; this.isLast = isLast; this.timing = timing; - this.event_time = event_time; - this.processing_time = processing_time; + this.eventTime = eventTime; + this.processingTime = processingTime; } - public String getTrigger_type() + public String getTriggerType() { - return trigger_type; + return triggerType; } - public void setTrigger_type(String trigger_type) + public void setTriggerType(String triggerType) { - this.trigger_type = trigger_type; + this.triggerType = triggerType; } public String getFreeway() @@ -447,24 +444,24 @@ public class TriggerExample this.freeway = freeway; } - public int getTotal_flow() + public int getTotalFlow() { - return total_flow; + return totalFlow; } - public void setTotal_flow(int total_flow) + public void setTotalFlow(int totalFlow) { - this.total_flow = total_flow; + this.totalFlow = totalFlow; } - public long getNumber_of_records() + public long getNumberOfRecords() { - return number_of_records; + return numberOfRecords; } - public void setNumber_of_records(long number_of_records) + public void setNumberOfRecords(long numberOfRecords) { - this.number_of_records = number_of_records; + this.numberOfRecords = numberOfRecords; } public String getWindow() @@ -507,24 +504,24 @@ public class TriggerExample this.timing = timing; } - public Date getEvent_time() + public Date getEventTime() { - return event_time; + return eventTime; } - public void setEvent_time(Date event_time) + public void setEventTime(Date eventTime) { - this.event_time = event_time; + this.eventTime = eventTime; } - public Date getProcessing_time() + public Date getProcessingTime() { - return processing_time; + return processingTime; } - public void setProcessing_time(Date processing_time) + public void setProcessingTime(Date processingTime) { - this.processing_time = processing_time; + this.processingTime = processingTime; } } @@ -532,10 +529,10 @@ public class TriggerExample * Extract the freeway and total flow in a reading. * Freeway is used as key since we are calculating the total flow for each freeway. */ - static class ExtractFlowInfo implements Function.MapFunction<String, KeyValPair<String, Integer>> + static class ExtractFlowInfo implements Function.ToKeyValue<String, String, Integer> { @Override - public KeyValPair<String, Integer> f(String input) + public Tuple<KeyValPair<String, Integer>> f(String input) { String[] laneInfo = input.split(","); if (laneInfo[0].equals("timestamp")) { @@ -553,7 +550,7 @@ public class TriggerExample if (totalFlow == null || totalFlow <= 0) { return null; } - return new KeyValPair<>(freeway, totalFlow); + return new Tuple.PlainTuple<>(new KeyValPair<>(freeway, totalFlow)); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/resources/META-INF/properties.xml b/demos/highlevelapi/src/main/resources/META-INF/properties.xml new file mode 100644 index 0000000..ead0460 --- /dev/null +++ b/demos/highlevelapi/src/main/resources/META-INF/properties.xml @@ -0,0 +1,141 @@ +<?xml version="1.0"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<configuration> + <!-- + <property> + <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name> + <value>some-default-value (if value is not specified, it is required from the user or custom config when launching)</value> + </property> + --> + + <!-- Properties for TwitterAutoComplete, please fill out all of them to make the application work --> + <property> + <name>dt.application.TwitterAutoComplete.operator.tweetSampler.consumerKey</name> + <value></value> + </property> + <property> + <name>dt.application.TwitterAutoComplete.operator.tweetSampler.consumerSecret</name> + <value></value> + </property> + <property> + <name>dt.application.TwitterAutoComplete.operator.tweetSampler.accessToken</name> + <value></value> + </property> + <property> + <name>dt.application.TwitterAutoComplete.operator.tweetSampler.accessTokenSecret</name> + <value></value> + </property> + + <!-- Properties for StreamingWordExtract --> + <property> + <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.store.userName</name> + <value>root</value> + </property> + <property> + <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.store.password</name> + <value>password</value> + </property> + <property> + <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.store.databaseDriver</name> + <value>org.hsqldb.jdbcDriver</value> + </property> + <property> + <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.batchSize</name> + <value>5</value> + </property> + <property> + <name>dt.application.StreamingWordExtract.operator.jdbcOutput.port.input.attr.TUPLE_CLASS</name> + <value>org.apache.apex.malhar.stream.sample.complete.PojoEvent</value> + </property> + <property> + <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.store.databaseUrl</name> + <value>jdbc:hsqldb:mem:test</value> + </property> + <property> + <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.tablename</name> + <value>Test</value> + </property> + + <!-- Properties for MaxPerKeyExamples --> + <property> + <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.store.userName</name> + <value>root</value> + </property> + <property> + <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.store.password</name> + <value>password</value> + </property> + <property> + <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.store.databaseDriver</name> + <value>org.hsqldb.jdbcDriver</value> + </property> + <property> + <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.batchSize</name> + <value>5</value> + </property> + <property> + <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.port.outputPort.attr.TUPLE_CLASS</name> + <value>org.apache.apex.malhar.stream.sample.cookbook.InputPojo</value> + </property> + <property> + <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.store.databaseUrl</name> + <value>jdbc:hsqldb:mem:test</value> + </property> + <property> + <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.tableName</name> + <value>InputTable</value> + </property> + <property> + <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.query</name> + <value>SELECT * FROM InputTable;</value> + </property> + <property> + <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.store.userName</name> + <value>root</value> + </property> + <property> + <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.store.password</name> + <value>password</value> + </property> + <property> + <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.store.databaseDriver</name> + <value>org.hsqldb.jdbcDriver</value> + </property> + <property> + <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.batchSize</name> + <value>5</value> + </property> + <property> + <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.port.input.attr.TUPLE_CLASS</name> + <value>org.apache.apex.malhar.stream.sample.cookbook.OutputPojo</value> + </property> + <property> + <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.store.databaseUrl</name> + <value>jdbc:hsqldb:mem:test</value> + </property> + <property> + <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.tablename</name> + <value>OutputTable</value> + </property> + +</configuration> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java new file mode 100644 index 0000000..101953f --- /dev/null +++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample; + +import java.util.concurrent.Callable; + +import org.junit.Assert; +import org.junit.Test; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; +import com.datatorrent.stram.StramLocalCluster; + +/** + * Test for {@link MinimalWordCount}. + */ +public class MinimalWordCountTest +{ + @Test + public void MinimalWordCountTest() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + + MinimalWordCount app = new MinimalWordCount(); + + lma.prepareDAG(app, conf); + + LocalMode.Controller lc = lma.getController(); + ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() + { + @Override + public Boolean call() throws Exception + { + return MinimalWordCount.Collector.isDone(); + } + }); + + lc.run(10000); + + Assert.assertTrue(MinimalWordCount.Collector.result.get("error") == 7); + Assert.assertTrue(MinimalWordCount.Collector.result.get("word") == 119); + Assert.assertTrue(MinimalWordCount.Collector.result.get("bye") == 1); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java new file mode 100644 index 0000000..952356f --- /dev/null +++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample; + +import java.util.Map; +import java.util.concurrent.Callable; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; +import com.datatorrent.lib.util.KeyValPair; +import com.datatorrent.stram.StramLocalCluster; + +/** + * Testing the TwitterAutoComplete Application. In order to run this test, you need to create an app + * at https://apps.twitter.com, then generate your consumer and access keys and tokens, and set the following properties + * for the application before running it: + * Your application consumer key, + * Your application consumer secret, + * Your twitter access token, and + * Your twitter access token secret. + */ +public class WindowedWordCountTest +{ + @Test + public void WindowedWordCountTest() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + lma.prepareDAG(new WindowedWordCount(), conf); + LocalMode.Controller lc = lma.getController(); + ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() + { + @Override + public Boolean call() throws Exception + { + return WindowedWordCount.TextInput.isDone(); + } + }); + + lc.run(60000); + + Assert.assertEquals(127, countSum(WindowedWordCount.Collector.getResult())); + Assert.assertEquals(28, countSumWord(WindowedWordCount.Collector.getResult(), "word2")); + Assert.assertEquals(7, countSumWord(WindowedWordCount.Collector.getResult(), "error")); + Assert.assertEquals(21, countSumWord(WindowedWordCount.Collector.getResult(), "word9")); + Assert.assertEquals(1, countSumWord(WindowedWordCount.Collector.getResult(), "bye")); + } + + public long countSum(Map<KeyValPair<Long, String>, Long> map) + { + long sum = 0; + for (long count : map.values()) { + sum += count; + } + return sum; + } + + public long countSumWord(Map<KeyValPair<Long, String>, Long> map, String word) + { + long sum = 0; + for (Map.Entry<KeyValPair<Long, String>, Long> entry : map.entrySet()) { + if (entry.getKey().getValue().equals(word)) { + sum += entry.getValue(); + } + } + return sum; + } + +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java new file mode 100644 index 0000000..dc236f9 --- /dev/null +++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.complete; + +import java.util.concurrent.Callable; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; +import com.datatorrent.stram.StramLocalCluster; + +/** + * Testing the AutoComplete Application + */ +public class AutoCompleteTest +{ + + @Test + public void AutoCompleteTest() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + lma.prepareDAG(new AutoComplete(), conf); + LocalMode.Controller lc = lma.getController(); + + ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() + { + @Override + public Boolean call() throws Exception + { + return AutoComplete.TweetsInput.isDone(); + } + }); + + lc.run(200000); + + Assert.assertTrue(AutoComplete.Collector.getResult().containsKey("chi")); + Assert.assertTrue(AutoComplete.Collector.getResult().containsKey("china")); + Assert.assertEquals(2, AutoComplete.Collector.getResult().get("china").get(0).getCount()); + Assert.assertEquals("China", AutoComplete.Collector.getResult().get("china").get(0).getValue()); + Assert.assertEquals(2, AutoComplete.Collector.getResult().get("d").size()); + Assert.assertEquals(3, AutoComplete.Collector.getResult().get("f").size()); + Assert.assertTrue(AutoComplete.Collector.getResult().get("f").get(0).getCount() >= AutoComplete.Collector.getResult().get("f").get(1).getCount()); + Assert.assertTrue(AutoComplete.Collector.getResult().get("f").get(1).getCount() >= AutoComplete.Collector.getResult().get("f").get(2).getCount()); + + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java new file mode 100644 index 0000000..bf9b030 --- /dev/null +++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.complete; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.Callable; + +import org.junit.After; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Throwables; +import com.datatorrent.api.LocalMode; +import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore; +import com.datatorrent.stram.StramLocalCluster; + +/** + * Testing StreamingWordExtract application + */ +public class StreamingWordExtractTest +{ + private static final String TUPLE_CLASS = "org.apache.apex.malhar.stream.sample.complete.PojoEvent"; + private static final String DB_DRIVER = "org.h2.Driver"; + private static final String DB_URL = "jdbc:h2:~/test"; + private static final String TABLE_NAME = "Test"; + private static final String USER_NAME = "root"; + private static final String PSW = "password"; + + @BeforeClass + public static void setup() + { + try { + Class.forName(DB_DRIVER).newInstance(); + + Connection con = DriverManager.getConnection(DB_URL,USER_NAME,PSW); + Statement stmt = con.createStatement(); + + String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( " + + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, " + + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, " + + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, " + + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", " + + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") " + + ")"; + stmt.executeUpdate(createMetaTable); + + String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME + + "(STRINGVALUE VARCHAR(255))"; + stmt.executeUpdate(createTable); + + } catch (Throwable e) { + throw Throwables.propagate(e); + } + } + + @After + public void cleanTable() + { + try { + Connection con = DriverManager.getConnection(DB_URL,USER_NAME,PSW); + Statement stmt = con.createStatement(); + String dropTable = "drop table " + TABLE_NAME; + stmt.executeUpdate(dropTable); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public void setConfig(Configuration conf) + { + conf.set("dt.operator.jdbcOutput.prop.store.userName", USER_NAME); + conf.set("dt.operator.jdbcOutput.prop.store.password", PSW); + conf.set("dt.operator.jdbcOutput.prop.store.databaseDriver", DB_DRIVER); + conf.set("dt.operator.jdbcOutput.prop.batchSize", "5"); + conf.set("dt.operator.jdbcOutput.port.input.attr.TUPLE_CLASS", TUPLE_CLASS); + conf.set("dt.operator.jdbcOutput.prop.store.databaseUrl", DB_URL); + conf.set("dt.operator.jdbcOutput.prop.tablename", TABLE_NAME); + } + + public int getNumOfEventsInStore() + { + Connection con; + try { + con = DriverManager.getConnection(DB_URL,USER_NAME,PSW); + Statement stmt = con.createStatement(); + + String countQuery = "SELECT count(*) from " + TABLE_NAME; + ResultSet resultSet = stmt.executeQuery(countQuery); + resultSet.next(); + return resultSet.getInt(1); + } catch (SQLException e) { + throw new RuntimeException("fetching count", e); + } + } + + @Test + public void StreamingWordExtractTest() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + setConfig(conf); + StreamingWordExtract app = new StreamingWordExtract(); + lma.prepareDAG(app, conf); + LocalMode.Controller lc = lma.getController(); + + ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() + { + @Override + public Boolean call() throws Exception + { + return getNumOfEventsInStore() == 36; + } + }); + + lc.run(10000); + + Assert.assertEquals(app.getWordCount(), getNumOfEventsInStore()); + Assert.assertEquals(app.getEntriesMapped(), getNumOfEventsInStore()); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java new file mode 100644 index 0000000..f8ec086 --- /dev/null +++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.complete; + +import java.util.List; +import java.util.concurrent.Callable; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; +import com.datatorrent.stram.StramLocalCluster; + +/** + * Testing the {@link TopWikipediaSessions} Application. + */ +public class TopWikipediaSessionsTest +{ + @Test + public void TopWikipediaSessionsTest() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + lma.prepareDAG(new TopWikipediaSessions(), conf); + LocalMode.Controller lc = lma.getController(); + + ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() + { + @Override + public Boolean call() throws Exception + { + return TopWikipediaSessions.SessionGen.getTupleCount() >= 250; + } + }); + + lc.run(30000); + + for (int i = 0; i < TopWikipediaSessions.Collector.getResult().size(); i++) { + Assert.assertTrue(isInOrder(TopWikipediaSessions.Collector.getResult().get(i))); + } + } + + public boolean isInOrder(List<TopWikipediaSessions.TempWrapper> input) + { + if (input.size() == 0 || input.size() == 1) { + return true; + } + for (int i = 0; i < input.size() - 2; i++) { + if (input.get(i).getValue().getValue() < input.get(i + 1).getValue().getValue()) { + return false; + } + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java new file mode 100644 index 0000000..e363ca7 --- /dev/null +++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.complete; + +import java.util.Map; +import java.util.concurrent.Callable; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; +import com.datatorrent.lib.util.KeyValPair; +import com.datatorrent.stram.StramLocalCluster; + +/** + * Testing the {@link TrafficRoutes} Application. + */ +public class TrafficRoutesTest +{ + + @Test + public void TrafficRoutesTest() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + lma.prepareDAG(new TrafficRoutes(), conf); + LocalMode.Controller lc = lma.getController(); + + ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() + { + @Override + public Boolean call() throws Exception + { + return TrafficRoutes.InfoGen.getTupleCount() >= 100; + } + }); + + lc.run(60000); + + Assert.assertTrue(!TrafficRoutes.Collector.getResult().isEmpty()); + for (Map.Entry<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> entry : TrafficRoutes.Collector.getResult().entrySet()) { + Assert.assertTrue(entry.getValue().getKey() <= 75); + Assert.assertTrue(entry.getValue().getKey() >= 55); + Assert.assertTrue(entry.getKey().getValue().equals("SDRoute1") || entry.getKey().getValue().equals("SDRoute2")); + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoCompleteTest.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoCompleteTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoCompleteTest.java new file mode 100644 index 0000000..9ba2f25 --- /dev/null +++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoCompleteTest.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.complete; + +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + +/** + * Testing the TwitterAutoComplete Application. In order to run this test, you need to create an app + * at https://apps.twitter.com, then generate your consumer and access keys and tokens, and set the following properties + * for the application before running it: + * Your application consumer key, + * Your application consumer secret, + * Your twitter access token, and + * Your twitter access token secret. + * + * This test is mainly for local demonstration purpose. Default time to run the application is 1 minute, please + * set the time you need to run the application before you run. + */ +public class TwitterAutoCompleteTest +{ + private static final Logger logger = LoggerFactory.getLogger(org.apache.apex.malhar.stream.sample.complete.AutoCompleteTest.class); + + @Test + @Ignore + public void TwitterAutoCompleteTest() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + //uncomment the following lines and change YOUR_XXX to the corresponding information needed. + //conf.set("dt.application.TwitterAutoComplete.operator.tweetSampler.consumerKey", "YOUR_CONSUMERKEY"); + //conf.set("dt.application.TwitterAutoComplete.operator.tweetSampler.consumerSecret", "YOUR_CONSUERSECRET"); + //conf.set("dt.application.TwitterAutoComplete.operator.tweetSampler.accessToken", "YOUR_ACCESSTOKEN"); + //conf.set("dt.application.TwitterAutoComplete.operator.tweetSampler.accessTokenSecret", "YOUR_TOKENSECRET"); + lma.prepareDAG(new TwitterAutoComplete(), conf); + LocalMode.Controller lc = lma.getController(); + long start = System.currentTimeMillis(); + lc.run(60000); // Set your desired time to run the application here. + long end = System.currentTimeMillis(); + long time = end - start; + logger.info("Test used " + time + " ms"); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java new file mode 100644 index 0000000..5858013 --- /dev/null +++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.cookbook; + +import java.util.concurrent.Callable; +import org.junit.Assert; +import org.junit.Test; +import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.LocalMode; +import com.datatorrent.stram.StramLocalCluster; + +/** + * Test for {@link CombinePerKeyExamples}. + */ +public class CombinePerKeyExamplesTest +{ + @Test + public void CombinePerKeyExamplesTest() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + + CombinePerKeyExamples app = new CombinePerKeyExamples(); + + lma.prepareDAG(app, conf); + + LocalMode.Controller lc = lma.getController(); + ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() + { + @Override + public Boolean call() throws Exception + { + return CombinePerKeyExamples.SampleInput.getI() >= 1; + } + }); + lc.run(100000); + + Assert.assertTrue(CombinePerKeyExamples.Collector.result.get(CombinePerKeyExamples.Collector.result.size() - 1).getCorpus().contains("1, 2, 3, 4, 5, 6, 7, 8")); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java new file mode 100644 index 0000000..ed4ddb4 --- /dev/null +++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.cookbook; + +import java.util.concurrent.Callable; + +import org.junit.Assert; +import org.junit.Test; +import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.LocalMode; + +import com.datatorrent.stram.StramLocalCluster; + + +/** + * Test for {@link DeDupExample}. + */ +public class DeDupExampleTest +{ + @Test + public void DeDupExampleTest() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + + DeDupExample app = new DeDupExample(); + lma.prepareDAG(app, conf); + LocalMode.Controller lc = lma.getController(); + ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() + { + @Override + public Boolean call() throws Exception + { + return DeDupExample.Collector.isDone(); + } + }); + lc.run(50000); + + Assert.assertEquals(9, DeDupExample.Collector.getResult().getValue().size()); + + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java new file mode 100644 index 0000000..51981de --- /dev/null +++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.cookbook; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Callable; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Throwables; + +import com.datatorrent.api.LocalMode; +import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore; +import com.datatorrent.stram.StramLocalCluster; + +/** + * Test for MaxPerKeyExamples Application. + */ +public class MaxPerKeyExamplesTest +{ + + private static final String INPUT_TUPLE_CLASS = "org.apache.apex.malhar.stream.sample.cookbook.InputPojo"; + private static final String OUTPUT_TUPLE_CLASS = "org.apache.apex.malhar.stream.sample.cookbook.OutputPojo"; + private static final String DB_DRIVER = "org.h2.Driver"; + private static final String DB_URL = "jdbc:h2:~/test"; + private static final String INPUT_TABLE = "InputTable"; + private static final String OUTPUT_TABLE = "OutputTable"; + private static final String USER_NAME = "root"; + private static final String PSW = "password"; + private static final String QUERY = "SELECT * FROM " + INPUT_TABLE + ";"; + + private static final double[] MEANTEMPS = {85.3, 75.4}; + + @BeforeClass + public static void setup() + { + try { + Class.forName(DB_DRIVER).newInstance(); + + Connection con = DriverManager.getConnection(DB_URL,USER_NAME,PSW); + Statement stmt = con.createStatement(); + + String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( " + + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, " + + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, " + + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, " + + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", " + + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") " + + ")"; + stmt.executeUpdate(createMetaTable); + + String createInputTable = "CREATE TABLE IF NOT EXISTS " + INPUT_TABLE + + "(MONTH INT(2) not NULL, DAY INT(2), YEAR INT(4), MEANTEMP DOUBLE(10) )"; + stmt.executeUpdate(createInputTable); + + String createOutputTable = "CREATE TABLE IF NOT EXISTS " + OUTPUT_TABLE + + "(MONTH INT(2) not NULL, MEANTEMP DOUBLE(10) )"; + stmt.executeUpdate(createOutputTable); + + String cleanTable = "truncate table " + INPUT_TABLE; + stmt.executeUpdate(cleanTable); + + stmt = con.createStatement(); + + String sql = "INSERT INTO " + INPUT_TABLE + " VALUES (6, 21, 2014, 85.3)"; + stmt.executeUpdate(sql); + sql = "INSERT INTO " + INPUT_TABLE + " VALUES (7, 20, 2014, 75.4)"; + stmt.executeUpdate(sql); + sql = "INSERT INTO " + INPUT_TABLE + " VALUES (6, 18, 2014, 45.3)"; + stmt.executeUpdate(sql); + + } catch (Throwable e) { + throw Throwables.propagate(e); + } + } + + @AfterClass + public static void cleanup() + { + try { + Class.forName(DB_DRIVER).newInstance(); + + Connection con = DriverManager.getConnection(DB_URL, USER_NAME, PSW); + Statement stmt = con.createStatement(); + + String dropInputTable = "DROP TABLE " + INPUT_TABLE; + stmt.executeUpdate(dropInputTable); + + String dropOutputTable = "DROP TABLE " + OUTPUT_TABLE; + stmt.executeUpdate(dropOutputTable); + + } catch (Throwable e) { + throw Throwables.propagate(e); + } + + } + + public void setConfig(Configuration conf) + { + conf.set("dt.operator.jdbcInput.prop.store.userName", USER_NAME); + conf.set("dt.operator.jdbcInput.prop.store.password", PSW); + conf.set("dt.operator.jdbcInput.prop.store.databaseDriver", DB_DRIVER); + conf.set("dt.operator.jdbcInput.prop.batchSize", "5"); + conf.set("dt.operator.jdbcInput.port.outputPort.attr.TUPLE_CLASS", INPUT_TUPLE_CLASS); + conf.set("dt.operator.jdbcInput.prop.store.databaseUrl", DB_URL); + conf.set("dt.operator.jdbcInput.prop.tableName", INPUT_TABLE); + conf.set("dt.operator.jdbcInput.prop.query", QUERY); + + conf.set("dt.operator.jdbcOutput.prop.store.userName", USER_NAME); + conf.set("dt.operator.jdbcOutput.prop.store.password", PSW); + conf.set("dt.operator.jdbcOutput.prop.store.databaseDriver", DB_DRIVER); + conf.set("dt.operator.jdbcOutput.prop.batchSize", "5"); + conf.set("dt.operator.jdbcOutput.port.input.attr.TUPLE_CLASS", OUTPUT_TUPLE_CLASS); + conf.set("dt.operator.jdbcOutput.prop.store.databaseUrl", DB_URL); + conf.set("dt.operator.jdbcOutput.prop.tablename", OUTPUT_TABLE); + } + + public int getNumEntries() + { + Connection con; + try { + con = DriverManager.getConnection(DB_URL,USER_NAME,PSW); + Statement stmt = con.createStatement(); + + String countQuery = "SELECT count(DISTINCT (MONTH, MEANTEMP)) from " + OUTPUT_TABLE; + ResultSet resultSet = stmt.executeQuery(countQuery); + resultSet.next(); + return resultSet.getInt(1); + } catch (SQLException e) { + throw new RuntimeException("fetching count", e); + } + } + + public Map<Integer, Double> getMaxMeanTemp() + { + Map<Integer, Double> result = new HashMap<>(); + Connection con; + try { + con = DriverManager.getConnection(DB_URL,USER_NAME,PSW); + Statement stmt = con.createStatement(); + + String countQuery = "SELECT DISTINCT * from " + OUTPUT_TABLE; + ResultSet resultSet = stmt.executeQuery(countQuery); + while (resultSet.next()) { + result.put(resultSet.getInt("MONTH"), resultSet.getDouble("MEANTEMP")); + + } + return result; + } catch (SQLException e) { + throw new RuntimeException("fetching count", e); + } + } + + @Test + public void MaxPerKeyExampleTest() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + setConfig(conf); + + MaxPerKeyExamples app = new MaxPerKeyExamples(); + + lma.prepareDAG(app, conf); + + LocalMode.Controller lc = lma.getController(); + ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() + { + @Override + public Boolean call() throws Exception + { + return getNumEntries() == 2; + } + }); + + lc.run(5000); + + double[] result = new double[2]; + result[0] = getMaxMeanTemp().get(6); + result[1] = getMaxMeanTemp().get(7); + Assert.assertArrayEquals(MEANTEMPS, result, 0.0); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/resources/data/word.txt ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/resources/data/word.txt b/demos/highlevelapi/src/test/resources/data/word.txt new file mode 100644 index 0000000..7e28409 --- /dev/null +++ b/demos/highlevelapi/src/test/resources/data/word.txt @@ -0,0 +1,2 @@ +word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error +word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error
