http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java deleted file mode 100644 index bfdb268..0000000 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java +++ /dev/null @@ -1,285 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.stream.sample.cookbook; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.apex.malhar.lib.window.TriggerOption; -import org.apache.apex.malhar.lib.window.Tuple; -import org.apache.apex.malhar.lib.window.WindowOption; -import org.apache.apex.malhar.lib.window.accumulation.ReduceFn; -import org.apache.apex.malhar.stream.api.ApexStream; -import org.apache.apex.malhar.stream.api.CompositeStreamTransform; -import org.apache.apex.malhar.stream.api.WindowedStream; -import org.apache.apex.malhar.stream.api.function.Function; -import org.apache.apex.malhar.stream.api.impl.StreamFactory; - -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.Context; -import com.datatorrent.api.DAG; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.InputOperator; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.lib.util.KeyValPair; - -import static org.apache.apex.malhar.stream.api.Option.Options.name; - -/** - * An example that reads the public 'Shakespeare' data, and for each word in - * the dataset that is over a given length, generates a string containing the - * list of play names in which that word appears - * - * <p>Concepts: the combine transform, which lets you combine the values in a - * key-grouped Collection - * - * - * @since 3.5.0 - */ -@ApplicationAnnotation(name = "CombinePerKeyExamples") -public class CombinePerKeyExamples implements StreamingApplication -{ - // Use the shakespeare public BigQuery sample - private static final String SHAKESPEARE_TABLE = "publicdata:samples.shakespeare"; - // We'll track words >= this word length across all plays in the table. - private static final int MIN_WORD_LENGTH = 0; - - /** - * Examines each row in the input table. If the word is greater than or equal to MIN_WORD_LENGTH, - * outputs word, play_name. - */ - static class ExtractLargeWordsFn implements Function.MapFunction<SampleBean, KeyValPair<String, String>> - { - - @Override - public KeyValPair<String, String> f(SampleBean input) - { - String playName = input.getCorpus(); - String word = input.getWord(); - if (word.length() >= MIN_WORD_LENGTH) { - return new KeyValPair<>(word, playName); - } else { - return null; - } - } - } - - - /** - * Prepares the output data which is in same bean - */ - static class FormatShakespeareOutputFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, String>>, SampleBean> - { - @Override - public SampleBean f(Tuple.WindowedTuple<KeyValPair<String, String>> input) - { - return new SampleBean(input.getValue().getKey(), input.getValue().getValue()); - } - } - - /** - * A reduce function to concat two strings together. - */ - public static class Concat extends ReduceFn<String> - { - @Override - public String reduce(String input1, String input2) - { - return input1 + ", " + input2; - } - } - - /** - * Reads the public 'Shakespeare' data, and for each word in the dataset - * over a given length, generates a string containing the list of play names - * in which that word appears. - */ - private static class PlaysForWord extends CompositeStreamTransform<ApexStream<SampleBean>, WindowedStream<SampleBean>> - { - - @Override - public WindowedStream<SampleBean> compose(ApexStream<SampleBean> inputStream) - { - return inputStream - // Extract words from the input SampleBeam stream. - .map(new ExtractLargeWordsFn(), name("ExtractLargeWordsFn")) - - // Apply window and trigger option to the streams. - .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1)) - - // Apply reduceByKey transformation to concat the names of all the plays that a word has appeared in together. - .reduceByKey(new Concat(), new Function.ToKeyValue<KeyValPair<String,String>, String, String>() - { - @Override - public Tuple<KeyValPair<String, String>> f(KeyValPair<String, String> input) - { - return new Tuple.PlainTuple<KeyValPair<String, String>>(input); - } - }, name("Concat")) - - // Format the output back to a SampleBeam object. - .map(new FormatShakespeareOutputFn(), name("FormatShakespeareOutputFn")); - } - } - - - /** - * A Java Beam class that contains information about a word appears in a corpus written by Shakespeare. - */ - public static class SampleBean - { - - public SampleBean() - { - - } - - public SampleBean(String word, String corpus) - { - this.word = word; - this.corpus = corpus; - } - - @Override - public String toString() - { - return this.word + " : " + this.corpus; - } - - private String word; - - private String corpus; - - public void setWord(String word) - { - this.word = word; - } - - public String getWord() - { - return word; - } - - public void setCorpus(String corpus) - { - this.corpus = corpus; - } - - public String getCorpus() - { - return corpus; - } - } - - /** - * A dummy info generator to generate {@link SampleBean} objects to mimic reading from real 'Shakespeare' - * data. - */ - public static class SampleInput extends BaseOperator implements InputOperator - { - - public final transient DefaultOutputPort<SampleBean> beanOutput = new DefaultOutputPort(); - private String[] words = new String[]{"A", "B", "C", "D", "E", "F", "G"}; - private String[] corpuses = new String[]{"1", "2", "3", "4", "5", "6", "7", "8"}; - private static int i; - - @Override - public void setup(Context.OperatorContext context) - { - super.setup(context); - i = 0; - } - - @Override - public void emitTuples() - { - while (i < 1) { - for (String word : words) { - for (String corpus : corpuses) { - try { - Thread.sleep(50); - beanOutput.emit(new SampleBean(word, corpus)); - } catch (Exception e) { - // Ignore it - } - } - } - i++; - } - - } - } - - public static class Collector extends BaseOperator - { - private static List<SampleBean> result; - private static boolean done = false; - - public static List<SampleBean> getResult() - { - return result; - } - - public static boolean isDone() - { - return done; - } - - @Override - public void setup(Context.OperatorContext context) - { - result = new ArrayList<>(); - done = false; - } - - public final transient DefaultInputPort<SampleBean> input = new DefaultInputPort<SampleBean>() - { - @Override - public void process(SampleBean tuple) - { - if (tuple.getWord().equals("F")) { - done = true; - } - result.add(tuple); - } - }; - } - - /** - * Populate dag using High-Level API. - * @param dag - * @param conf - */ - @Override - public void populateDAG(DAG dag, Configuration conf) - { - SampleInput input = new SampleInput(); - Collector collector = new Collector(); - StreamFactory.fromInput(input, input.beanOutput, name("input")) - .addCompositeStreams(new PlaysForWord()) - .print(name("console")) - .endWith(collector, collector.input, name("Collector")) - .populateDag(dag); - - } -}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java deleted file mode 100644 index 4df5fe7..0000000 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java +++ /dev/null @@ -1,127 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.stream.sample.cookbook; - -import java.util.Arrays; -import java.util.List; - -import org.joda.time.Duration; - -import org.apache.apex.malhar.lib.window.TriggerOption; -import org.apache.apex.malhar.lib.window.Tuple; -import org.apache.apex.malhar.lib.window.WindowOption; -import org.apache.apex.malhar.lib.window.accumulation.RemoveDuplicates; -import org.apache.apex.malhar.stream.api.ApexStream; -import org.apache.apex.malhar.stream.api.function.Function; -import org.apache.apex.malhar.stream.api.impl.StreamFactory; -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.Context; -import com.datatorrent.api.DAG; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.common.util.BaseOperator; - -import static org.apache.apex.malhar.stream.api.Option.Options.name; - -/** - * Beam DeDupExample. - * - * @since 3.5.0 - */ -@ApplicationAnnotation(name = "DeDupExample") -public class DeDupExample implements StreamingApplication -{ - - public static class Collector extends BaseOperator - { - private static Tuple.WindowedTuple<List<String>> result; - private static boolean done = false; - - public static Tuple.WindowedTuple<List<String>> getResult() - { - return result; - } - - public static boolean isDone() - { - return done; - } - - @Override - public void setup(Context.OperatorContext context) - { - super.setup(context); - result = new Tuple.WindowedTuple<>(); - done = false; - } - - public transient DefaultInputPort<Tuple.WindowedTuple<List<String>>> input = new DefaultInputPort<Tuple.WindowedTuple<List<String>>>() - { - @Override - public void process(Tuple.WindowedTuple<List<String>> tuple) - { - result = tuple; - if (result.getValue().contains("bye")) { - done = true; - } - } - }; - } - - @Override - public void populateDAG(DAG dag, Configuration conf) - { - Collector collector = new Collector(); - - // Create a stream that reads from files in a local folder and output lines one by one to downstream. - ApexStream<String> stream = StreamFactory.fromFolder("./src/test/resources/wordcount", name("textInput")) - - // Extract all the words from the input line of text. - .flatMap(new Function.FlatMapFunction<String, String>() - { - @Override - public Iterable<String> f(String input) - { - return Arrays.asList(input.split("[\\p{Punct}\\s]+")); - } - }, name("ExtractWords")) - - // Change the words to lower case, also shutdown the app when the word "bye" is detected. - .map(new Function.MapFunction<String, String>() - { - @Override - public String f(String input) - { - return input.toLowerCase(); - } - }, name("ToLowerCase")); - - // Apply window and trigger option. - stream.window(new WindowOption.GlobalWindow(), - new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(1))) - - // Remove the duplicate words and print out the result. - .accumulate(new RemoveDuplicates<String>(), name("RemoveDuplicates")) - .print(name("console")) - .endWith(collector, collector.input) - .populateDag(dag); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java deleted file mode 100644 index 834964c..0000000 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.stream.sample.cookbook; - -/** - * Tuple class for JDBC input of {@link MaxPerKeyExamples}. - * - * @since 3.5.0 - */ -public class InputPojo extends Object -{ - private int month; - private int day; - private int year; - private double meanTemp; - - @Override - public String toString() - { - return "PojoEvent [month=" + getMonth() + ", day=" + getDay() + ", year=" + getYear() + ", meanTemp=" + getMeanTemp() + "]"; - } - - public void setMonth(int month) - { - this.month = month; - } - - public int getMonth() - { - return this.month; - } - - public void setDay(int day) - { - this.day = day; - } - - public int getDay() - { - return day; - } - - public void setYear(int year) - { - this.year = year; - } - - public int getYear() - { - return year; - } - - public void setMeanTemp(double meanTemp) - { - this.meanTemp = meanTemp; - } - - public double getMeanTemp() - { - return meanTemp; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java deleted file mode 100644 index 9fd9495..0000000 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java +++ /dev/null @@ -1,205 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.stream.sample.cookbook; - -import java.util.List; - -import org.apache.apex.malhar.lib.window.TriggerOption; -import org.apache.apex.malhar.lib.window.Tuple; -import org.apache.apex.malhar.lib.window.Window; -import org.apache.apex.malhar.lib.window.WindowOption; -import org.apache.apex.malhar.lib.window.accumulation.Max; -import org.apache.apex.malhar.stream.api.ApexStream; -import org.apache.apex.malhar.stream.api.CompositeStreamTransform; -import org.apache.apex.malhar.stream.api.WindowedStream; -import org.apache.apex.malhar.stream.api.function.Function; -import org.apache.apex.malhar.stream.api.impl.StreamFactory; -import org.apache.hadoop.conf.Configuration; - -import static java.sql.Types.DOUBLE; -import static java.sql.Types.INTEGER; - -import com.google.common.collect.Lists; -import com.datatorrent.api.DAG; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.lib.db.jdbc.JdbcFieldInfo; -import com.datatorrent.lib.db.jdbc.JdbcPOJOInputOperator; -import com.datatorrent.lib.db.jdbc.JdbcPOJOInsertOutputOperator; -import com.datatorrent.lib.db.jdbc.JdbcStore; -import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore; -import com.datatorrent.lib.util.FieldInfo; -import com.datatorrent.lib.util.KeyValPair; - -import static org.apache.apex.malhar.stream.api.Option.Options.name; - -/** - * MaxPerKeyExamples Application from Beam - * - * @since 3.5.0 - */ -@ApplicationAnnotation(name = "MaxPerKeyExamples") -public class MaxPerKeyExamples implements StreamingApplication -{ - - /** - * A map function to extract the mean temperature from {@link InputPojo}. - */ - public static class ExtractTempFn implements Function.MapFunction<InputPojo, KeyValPair<Integer, Double>> - { - @Override - public KeyValPair<Integer, Double> f(InputPojo row) - { - Integer month = row.getMonth(); - Double meanTemp = row.getMeanTemp(); - return new KeyValPair<Integer, Double>(month, meanTemp); - } - } - - - /** - * A map function to format output to {@link OutputPojo}. - */ - public static class FormatMaxesFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<Integer, Double>>, OutputPojo> - { - @Override - public OutputPojo f(Tuple.WindowedTuple<KeyValPair<Integer, Double>> input) - { - OutputPojo row = new OutputPojo(); - row.setMonth(input.getValue().getKey()); - row.setMeanTemp(input.getValue().getValue()); - return row; - } - } - - /** - * A composite transformation to perform three tasks: - * 1. extract the month and its mean temperature from input pojo. - * 2. find the maximum mean temperature for every month. - * 3. format the result to a output pojo object. - */ - public static class MaxMeanTemp extends CompositeStreamTransform<WindowedStream<InputPojo>, WindowedStream<OutputPojo>> - { - @Override - public WindowedStream<OutputPojo> compose(WindowedStream<InputPojo> rows) - { - // InputPojo... => <month, meanTemp> ... - WindowedStream<KeyValPair<Integer, Double>> temps = rows.map(new ExtractTempFn(), name("ExtractTempFn")); - - // month, meanTemp... => <month, max mean temp>... - WindowedStream<Tuple.WindowedTuple<KeyValPair<Integer, Double>>> tempMaxes = - temps.accumulateByKey(new Max<Double>(), - new Function.ToKeyValue<KeyValPair<Integer, Double>, Integer, Double>() - { - @Override - public Tuple<KeyValPair<Integer, Double>> f(KeyValPair<Integer, Double> input) - { - return new Tuple.WindowedTuple<KeyValPair<Integer, Double>>(Window.GlobalWindow.INSTANCE, input); - } - }, name("MaxPerMonth")); - - // <month, max>... => OutputPojo... - WindowedStream<OutputPojo> results = tempMaxes.map(new FormatMaxesFn(), name("FormatMaxesFn")); - - return results; - } - } - - /** - * Method to set field info for {@link JdbcPOJOInputOperator}. - * @return - */ - private List<FieldInfo> addInputFieldInfos() - { - List<FieldInfo> fieldInfos = Lists.newArrayList(); - fieldInfos.add(new FieldInfo("MONTH", "month", FieldInfo.SupportType.INTEGER)); - fieldInfos.add(new FieldInfo("DAY", "day", FieldInfo.SupportType.INTEGER)); - fieldInfos.add(new FieldInfo("YEAR", "year", FieldInfo.SupportType.INTEGER)); - fieldInfos.add(new FieldInfo("MEANTEMP", "meanTemp", FieldInfo.SupportType.DOUBLE)); - return fieldInfos; - } - - /** - * Method to set field info for {@link JdbcPOJOInsertOutputOperator}. - * @return - */ - private List<JdbcFieldInfo> addOutputFieldInfos() - { - List<JdbcFieldInfo> fieldInfos = Lists.newArrayList(); - fieldInfos.add(new JdbcFieldInfo("MONTH", "month", JdbcFieldInfo.SupportType.INTEGER, INTEGER)); - fieldInfos.add(new JdbcFieldInfo("MEANTEMP", "meanTemp", JdbcFieldInfo.SupportType.DOUBLE, DOUBLE)); - return fieldInfos; - } - - - /** - * Populate the dag using High-Level API. - * @param dag - * @param conf - */ - @Override - public void populateDAG(DAG dag, Configuration conf) - { - JdbcPOJOInputOperator jdbcInput = new JdbcPOJOInputOperator(); - jdbcInput.setFieldInfos(addInputFieldInfos()); - - JdbcStore store = new JdbcStore(); - jdbcInput.setStore(store); - - JdbcPOJOInsertOutputOperator jdbcOutput = new JdbcPOJOInsertOutputOperator(); - jdbcOutput.setFieldInfos(addOutputFieldInfos()); - JdbcTransactionalStore outputStore = new JdbcTransactionalStore(); - jdbcOutput.setStore(outputStore); - - // Create stream that reads from a Jdbc Input. - ApexStream<Object> stream = StreamFactory.fromInput(jdbcInput, jdbcInput.outputPort, name("jdbcInput")) - - // Apply window and trigger option to the stream. - .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1)) - - // Because Jdbc Input sends out a stream of Object, need to cast them to InputPojo. - .map(new Function.MapFunction<Object, InputPojo>() - { - @Override - public InputPojo f(Object input) - { - return (InputPojo)input; - } - }, name("ObjectToInputPojo")) - - // Plug in the composite transformation to the stream to calculate the maximum temperature for each month. - .addCompositeStreams(new MaxMeanTemp()) - - // Cast the resulted OutputPojo to Object for Jdbc Output to consume. - .map(new Function.MapFunction<OutputPojo, Object>() - { - @Override - public Object f(OutputPojo input) - { - return (Object)input; - } - }, name("OutputPojoToObject")) - - // Output the result to Jdbc Output. - .endWith(jdbcOutput, jdbcOutput.input, name("jdbcOutput")); - - stream.populateDag(dag); - - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java deleted file mode 100644 index f3d0c64..0000000 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.stream.sample.cookbook; - -/** - * OutputPojo Tuple Class for jdbcOutput of {@link MaxPerKeyExamples}. - * - * @since 3.5.0 - */ -public class OutputPojo -{ - private int month; - private double meanTemp; - - @Override - public String toString() - { - return "PojoEvent [month=" + getMonth() + ", meanTemp=" + getMeanTemp() + "]"; - } - - public void setMonth(int month) - { - this.month = month; - } - - public int getMonth() - { - return this.month; - } - - public void setMeanTemp(double meanTemp) - { - this.meanTemp = meanTemp; - } - - public double getMeanTemp() - { - return meanTemp; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java deleted file mode 100644 index 962faa5..0000000 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java +++ /dev/null @@ -1,577 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.stream.sample.cookbook; - -import java.util.Date; -import java.util.Objects; - -import org.joda.time.Duration; - -import org.apache.apex.malhar.lib.window.TriggerOption; -import org.apache.apex.malhar.lib.window.Tuple; -import org.apache.apex.malhar.lib.window.WindowOption; -import org.apache.apex.malhar.stream.api.ApexStream; -import org.apache.apex.malhar.stream.api.CompositeStreamTransform; -import org.apache.apex.malhar.stream.api.WindowedStream; -import org.apache.apex.malhar.stream.api.function.Function; -import org.apache.apex.malhar.stream.api.impl.StreamFactory; - -import com.datatorrent.lib.util.KeyValPair; - -/** - * This example illustrates the basic concepts behind triggering. It shows how to use different - * trigger definitions to produce partial (speculative) results before all the data is processed and - * to control when updated results are produced for late data. The example performs a streaming - * analysis of the data coming in from PubSub and writes the results to BigQuery. It divides the - * data into {@link Window windows} to be processed, and demonstrates using various kinds of - * {@link org.apache.beam.sdk.transforms.windowing.Trigger triggers} to control when the results for - * each window are emitted. - * - * <p> This example uses a portion of real traffic data from San Diego freeways. It contains - * readings from sensor stations set up along each freeway. Each sensor reading includes a - * calculation of the 'total flow' across all lanes in that freeway direction. - * - * <p> Concepts: - * <pre> - * 1. The default triggering behavior - * 2. Late data with the default trigger - * 3. How to get speculative estimates - * 4. Combining late data and speculative estimates - * </pre> - * - * <p> Before running this example, it will be useful to familiarize yourself with Dataflow triggers - * and understand the concept of 'late data', - * See: <a href="https://cloud.google.com/dataflow/model/triggers"> - * https://cloud.google.com/dataflow/model/triggers </a> and - * <a href="https://cloud.google.com/dataflow/model/windowing#Advanced"> - * https://cloud.google.com/dataflow/model/windowing#Advanced </a> - * - * <p> The example pipeline reads data from a Pub/Sub topic. By default, running the example will - * also run an auxiliary pipeline to inject data from the default {@code --input} file to the - * {@code --pubsubTopic}. The auxiliary pipeline puts a timestamp on the injected data so that the - * example pipeline can operate on <i>event time</i> (rather than arrival time). The auxiliary - * pipeline also randomly simulates late data, by setting the timestamps of some of the data - * elements to be in the past. You may override the default {@code --input} with the file of your - * choosing or set {@code --input=""} which will disable the automatic Pub/Sub injection, and allow - * you to use a separate tool to publish to the given topic. - * - * <p> The example is configured to use the default Pub/Sub topic and the default BigQuery table - * from the example common package (there are no defaults for a general Dataflow pipeline). - * You can override them by using the {@code --pubsubTopic}, {@code --bigQueryDataset}, and - * {@code --bigQueryTable} options. If the Pub/Sub topic or the BigQuery table do not exist, - * the example will try to create them. - * - * <p> The pipeline outputs its results to a BigQuery table. - * Here are some queries you can use to see interesting results: - * Replace {@code <enter_table_name>} in the query below with the name of the BigQuery table. - * Replace {@code <enter_window_interval>} in the query below with the window interval. - * - * <p> To see the results of the default trigger, - * Note: When you start up your pipeline, you'll initially see results from 'late' data. Wait after - * the window duration, until the first pane of non-late data has been emitted, to see more - * interesting results. - * {@code SELECT * FROM enter_table_name WHERE triggerType = "default" ORDER BY window DESC} - * - * <p> To see the late data i.e. dropped by the default trigger, - * {@code SELECT * FROM <enter_table_name> WHERE triggerType = "withAllowedLateness" and - * (timing = "LATE" or timing = "ON_TIME") and freeway = "5" ORDER BY window DESC, processingTime} - * - * <p>To see the the difference between accumulation mode and discarding mode, - * {@code SELECT * FROM <enter_table_name> WHERE (timing = "LATE" or timing = "ON_TIME") AND - * (triggerType = "withAllowedLateness" or triggerType = "sequential") and freeway = "5" ORDER BY - * window DESC, processingTime} - * - * <p> To see speculative results every minute, - * {@code SELECT * FROM <enter_table_name> WHERE triggerType = "speculative" and freeway = "5" - * ORDER BY window DESC, processingTime} - * - * <p> To see speculative results every five minutes after the end of the window - * {@code SELECT * FROM <enter_table_name> WHERE triggerType = "sequential" and timing != "EARLY" - * and freeway = "5" ORDER BY window DESC, processingTime} - * - * <p> To see the first and the last pane for a freeway in a window for all the trigger types, - * {@code SELECT * FROM <enter_table_name> WHERE (isFirst = true or isLast = true) ORDER BY window} - * - * <p> To reduce the number of results for each query we can add additional where clauses. - * For examples, To see the results of the default trigger, - * {@code SELECT * FROM <enter_table_name> WHERE triggerType = "default" AND freeway = "5" AND - * window = "<enter_window_interval>"} - * - * <p> The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C) - * and then exits. - * - * @since 3.5.0 - */ - -public class TriggerExample -{ - //Numeric value of fixed window duration, in minutes - public static final int WINDOW_DURATION = 30; - // Constants used in triggers. - // Speeding up ONE_MINUTE or FIVE_MINUTES helps you get an early approximation of results. - // ONE_MINUTE is used only with processing time before the end of the window - public static final Duration ONE_MINUTE = Duration.standardMinutes(1); - // FIVE_MINUTES is used only with processing time after the end of the window - public static final Duration FIVE_MINUTES = Duration.standardMinutes(5); - // ONE_DAY is used to specify the amount of lateness allowed for the data elements. - public static final Duration ONE_DAY = Duration.standardDays(1); - - /** - * This transform demonstrates using triggers to control when data is produced for each window - * Consider an example to understand the results generated by each type of trigger. - * The example uses "freeway" as the key. Event time is the timestamp associated with the data - * element and processing time is the time when the data element gets processed in the pipeline. - * For freeway 5, suppose there are 10 elements in the [10:00:00, 10:30:00) window. - * Key (freeway) | Value (totalFlow) | event time | processing time - * 5 | 50 | 10:00:03 | 10:00:47 - * 5 | 30 | 10:01:00 | 10:01:03 - * 5 | 30 | 10:02:00 | 11:07:00 - * 5 | 20 | 10:04:10 | 10:05:15 - * 5 | 60 | 10:05:00 | 11:03:00 - * 5 | 20 | 10:05:01 | 11.07:30 - * 5 | 60 | 10:15:00 | 10:27:15 - * 5 | 40 | 10:26:40 | 10:26:43 - * 5 | 60 | 10:27:20 | 10:27:25 - * 5 | 60 | 10:29:00 | 11:11:00 - * - * <p> Dataflow tracks a watermark which records up to what point in event time the data is - * complete. For the purposes of the example, we'll assume the watermark is approximately 15m - * behind the current processing time. In practice, the actual value would vary over time based - * on the systems knowledge of the current PubSub delay and contents of the backlog (data - * that has not yet been processed). - * - * <p> If the watermark is 15m behind, then the window [10:00:00, 10:30:00) (in event time) would - * close at 10:44:59, when the watermark passes 10:30:00. - */ - static class CalculateTotalFlow - extends CompositeStreamTransform<ApexStream<String>, WindowedStream<SampleBean>> - { - private int windowDuration; - - CalculateTotalFlow(int windowDuration) - { - this.windowDuration = windowDuration; - } - - @Override - public WindowedStream<SampleBean> compose(ApexStream<String> inputStream) - { - // Concept #1: The default triggering behavior - // By default Dataflow uses a trigger which fires when the watermark has passed the end of the - // window. This would be written {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}. - - // The system also defaults to dropping late data -- data which arrives after the watermark - // has passed the event timestamp of the arriving element. This means that the default trigger - // will only fire once. - - // Each pane produced by the default trigger with no allowed lateness will be the first and - // last pane in the window, and will be ON_TIME. - - // The results for the example above with the default trigger and zero allowed lateness - // would be: - // Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing - // 5 | 260 | 6 | true | true | ON_TIME - - // At 11:03:00 (processing time) the system watermark may have advanced to 10:54:00. As a - // result, when the data record with event time 10:05:00 arrives at 11:03:00, it is considered - // late, and dropped. - - WindowedStream<SampleBean> defaultTriggerResults = inputStream - .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)), - new TriggerOption().discardingFiredPanes()) - .addCompositeStreams(new TotalFlow("default")); - - // Concept #2: Late data with the default trigger - // This uses the same trigger as concept #1, but allows data that is up to ONE_DAY late. This - // leads to each window staying open for ONE_DAY after the watermark has passed the end of the - // window. Any late data will result in an additional pane being fired for that same window. - - // The first pane produced will be ON_TIME and the remaining panes will be LATE. - // To definitely get the last pane when the window closes, use - // .withAllowedLateness(ONE_DAY, ClosingBehavior.FIRE_ALWAYS). - - // The results for the example above with the default trigger and ONE_DAY allowed lateness - // would be: - // Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing - // 5 | 260 | 6 | true | false | ON_TIME - // 5 | 60 | 1 | false | false | LATE - // 5 | 30 | 1 | false | false | LATE - // 5 | 20 | 1 | false | false | LATE - // 5 | 60 | 1 | false | false | LATE - WindowedStream<SampleBean> withAllowedLatenessResults = inputStream - .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)), - new TriggerOption().discardingFiredPanes(), - Duration.standardDays(1)) - .addCompositeStreams(new TotalFlow("withAllowedLateness")); - - // Concept #3: How to get speculative estimates - // We can specify a trigger that fires independent of the watermark, for instance after - // ONE_MINUTE of processing time. This allows us to produce speculative estimates before - // all the data is available. Since we don't have any triggers that depend on the watermark - // we don't get an ON_TIME firing. Instead, all panes are either EARLY or LATE. - - // We also use accumulatingFiredPanes to build up the results across each pane firing. - - // The results for the example above for this trigger would be: - // Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing - // 5 | 80 | 2 | true | false | EARLY - // 5 | 100 | 3 | false | false | EARLY - // 5 | 260 | 6 | false | false | EARLY - // 5 | 320 | 7 | false | false | LATE - // 5 | 370 | 9 | false | false | LATE - // 5 | 430 | 10 | false | false | LATE - - ApexStream<SampleBean> speculativeResults = inputStream - .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)), - //Trigger fires every minute - new TriggerOption().withEarlyFiringsAtEvery(Duration.standardMinutes(1)) - // After emitting each pane, it will continue accumulating the elements so that each - // approximation includes all of the previous data in addition to the newly arrived - // data. - .accumulatingFiredPanes(), - Duration.standardDays(1)) - .addCompositeStreams(new TotalFlow("speculative")); - - // Concept #4: Combining late data and speculative estimates - // We can put the previous concepts together to get EARLY estimates, an ON_TIME result, - // and LATE updates based on late data. - - // Each time a triggering condition is satisfied it advances to the next trigger. - // If there are new elements this trigger emits a window under following condition: - // > Early approximations every minute till the end of the window. - // > An on-time firing when the watermark has passed the end of the window - // > Every five minutes of late data. - - // Every pane produced will either be EARLY, ON_TIME or LATE. - - // The results for the example above for this trigger would be: - // Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing - // 5 | 80 | 2 | true | false | EARLY - // 5 | 100 | 3 | false | false | EARLY - // 5 | 260 | 6 | false | false | EARLY - // [First pane fired after the end of the window] - // 5 | 320 | 7 | false | false | ON_TIME - // 5 | 430 | 10 | false | false | LATE - - // For more possibilities of how to build advanced triggers, see {@link Trigger}. - WindowedStream<SampleBean> sequentialResults = inputStream - .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)), - // Speculative every ONE_MINUTE - new TriggerOption().withEarlyFiringsAtEvery(Duration.standardMinutes(1)) - .withLateFiringsAtEvery(Duration.standardMinutes(5)) - // After emitting each pane, it will continue accumulating the elements so that each - // approximation includes all of the previous data in addition to the newly arrived - // data. - .accumulatingFiredPanes(), - Duration.standardDays(1)) - .addCompositeStreams(new TotalFlow("sequential")); - - return sequentialResults; - } - - } - - ////////////////////////////////////////////////////////////////////////////////////////////////// - // The remaining parts of the pipeline are needed to produce the output for each - // concept above. Not directly relevant to understanding the trigger examples. - - /** - * Calculate total flow and number of records for each freeway and format the results to TableRow - * objects, to save to BigQuery. - */ - static class TotalFlow extends - CompositeStreamTransform<WindowedStream<String>, WindowedStream<SampleBean>> - { - private String triggerType; - - public TotalFlow(String triggerType) - { - this.triggerType = triggerType; - } - - @Override - public WindowedStream<SampleBean> compose(WindowedStream<String> inputStream) - { - - WindowedStream<KeyValPair<String, Iterable<Integer>>> flowPerFreeway = inputStream - .groupByKey(new ExtractFlowInfo()); - - return flowPerFreeway - .map(new Function.MapFunction<KeyValPair<String, Iterable<Integer>>, KeyValPair<String, String>>() - { - @Override - public KeyValPair<String, String> f(KeyValPair<String, Iterable<Integer>> input) - { - Iterable<Integer> flows = input.getValue(); - Integer sum = 0; - Long numberOfRecords = 0L; - for (Integer value : flows) { - sum += value; - numberOfRecords++; - } - return new KeyValPair<>(input.getKey(), sum + "," + numberOfRecords); - } - }) - .map(new FormatTotalFlow(triggerType)); - } - } - - /** - * Format the results of the Total flow calculation to a TableRow, to save to BigQuery. - * Adds the triggerType, pane information, processing time and the window timestamp. - */ - static class FormatTotalFlow implements Function.MapFunction<KeyValPair<String, String>, SampleBean> - { - private String triggerType; - - public FormatTotalFlow(String triggerType) - { - this.triggerType = triggerType; - } - - @Override - public SampleBean f(KeyValPair<String, String> input) - { - String[] values = input.getValue().split(","); - //TODO need to have a callback to get the metadata like window id, pane id, timestamps etc. - return new SampleBean(triggerType, input.getKey(), Integer.parseInt(values[0]), Long - .parseLong(values[1]), null, false, false, null, null, new Date()); - } - } - - public static class SampleBean - { - public SampleBean() - { - } - - private String triggerType; - - private String freeway; - - private int totalFlow; - - private long numberOfRecords; - - private String window; - - private boolean isFirst; - - private boolean isLast; - - private Date timing; - - private Date eventTime; - - private Date processingTime; - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - SampleBean that = (SampleBean)o; - return totalFlow == that.totalFlow && - numberOfRecords == that.numberOfRecords && - isFirst == that.isFirst && - isLast == that.isLast && - Objects.equals(triggerType, that.triggerType) && - Objects.equals(freeway, that.freeway) && - Objects.equals(window, that.window) && - Objects.equals(timing, that.timing) && - Objects.equals(eventTime, that.eventTime) && - Objects.equals(processingTime, that.processingTime); - } - - @Override - public int hashCode() - { - return Objects - .hash(triggerType, freeway, totalFlow, numberOfRecords, window, isFirst, isLast, timing, eventTime, - processingTime); - } - - public SampleBean(String triggerType, String freeway, int totalFlow, long numberOfRecords, String window, boolean isFirst, boolean isLast, Date timing, Date eventTime, Date processingTime) - { - - this.triggerType = triggerType; - this.freeway = freeway; - this.totalFlow = totalFlow; - this.numberOfRecords = numberOfRecords; - this.window = window; - this.isFirst = isFirst; - this.isLast = isLast; - this.timing = timing; - this.eventTime = eventTime; - this.processingTime = processingTime; - } - - public String getTriggerType() - { - return triggerType; - } - - public void setTriggerType(String triggerType) - { - this.triggerType = triggerType; - } - - public String getFreeway() - { - return freeway; - } - - public void setFreeway(String freeway) - { - this.freeway = freeway; - } - - public int getTotalFlow() - { - return totalFlow; - } - - public void setTotalFlow(int totalFlow) - { - this.totalFlow = totalFlow; - } - - public long getNumberOfRecords() - { - return numberOfRecords; - } - - public void setNumberOfRecords(long numberOfRecords) - { - this.numberOfRecords = numberOfRecords; - } - - public String getWindow() - { - return window; - } - - public void setWindow(String window) - { - this.window = window; - } - - public boolean isFirst() - { - return isFirst; - } - - public void setFirst(boolean first) - { - isFirst = first; - } - - public boolean isLast() - { - return isLast; - } - - public void setLast(boolean last) - { - isLast = last; - } - - public Date getTiming() - { - return timing; - } - - public void setTiming(Date timing) - { - this.timing = timing; - } - - public Date getEventTime() - { - return eventTime; - } - - public void setEventTime(Date eventTime) - { - this.eventTime = eventTime; - } - - public Date getProcessingTime() - { - return processingTime; - } - - public void setProcessingTime(Date processingTime) - { - this.processingTime = processingTime; - } - } - - /** - * Extract the freeway and total flow in a reading. - * Freeway is used as key since we are calculating the total flow for each freeway. - */ - static class ExtractFlowInfo implements Function.ToKeyValue<String, String, Integer> - { - @Override - public Tuple<KeyValPair<String, Integer>> f(String input) - { - String[] laneInfo = input.split(","); - if (laneInfo[0].equals("timestamp")) { - // Header row - return null; - } - if (laneInfo.length < 48) { - //Skip the invalid input. - return null; - } - String freeway = laneInfo[2]; - Integer totalFlow = tryIntegerParse(laneInfo[7]); - // Ignore the records with total flow 0 to easily understand the working of triggers. - // Skip the records with total flow -1 since they are invalid input. - if (totalFlow == null || totalFlow <= 0) { - return null; - } - return new Tuple.PlainTuple<>(new KeyValPair<>(freeway, totalFlow)); - } - } - - private static final String PUBSUB_TIMESTAMP_LABEL_KEY = "timestamp_ms"; - - public static void main(String[] args) throws Exception - { - StreamFactory.fromFolder("some folder") - .addCompositeStreams(new CalculateTotalFlow(60)); - - } - - private static Integer tryIntegerParse(String number) - { - try { - return Integer.parseInt(number); - } catch (NumberFormatException e) { - return null; - } - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/resources/META-INF/properties.xml b/demos/highlevelapi/src/main/resources/META-INF/properties.xml deleted file mode 100644 index ead0460..0000000 --- a/demos/highlevelapi/src/main/resources/META-INF/properties.xml +++ /dev/null @@ -1,141 +0,0 @@ -<?xml version="1.0"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<configuration> - <!-- - <property> - <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name> - <value>some-default-value (if value is not specified, it is required from the user or custom config when launching)</value> - </property> - --> - - <!-- Properties for TwitterAutoComplete, please fill out all of them to make the application work --> - <property> - <name>dt.application.TwitterAutoComplete.operator.tweetSampler.consumerKey</name> - <value></value> - </property> - <property> - <name>dt.application.TwitterAutoComplete.operator.tweetSampler.consumerSecret</name> - <value></value> - </property> - <property> - <name>dt.application.TwitterAutoComplete.operator.tweetSampler.accessToken</name> - <value></value> - </property> - <property> - <name>dt.application.TwitterAutoComplete.operator.tweetSampler.accessTokenSecret</name> - <value></value> - </property> - - <!-- Properties for StreamingWordExtract --> - <property> - <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.store.userName</name> - <value>root</value> - </property> - <property> - <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.store.password</name> - <value>password</value> - </property> - <property> - <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.store.databaseDriver</name> - <value>org.hsqldb.jdbcDriver</value> - </property> - <property> - <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.batchSize</name> - <value>5</value> - </property> - <property> - <name>dt.application.StreamingWordExtract.operator.jdbcOutput.port.input.attr.TUPLE_CLASS</name> - <value>org.apache.apex.malhar.stream.sample.complete.PojoEvent</value> - </property> - <property> - <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.store.databaseUrl</name> - <value>jdbc:hsqldb:mem:test</value> - </property> - <property> - <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.tablename</name> - <value>Test</value> - </property> - - <!-- Properties for MaxPerKeyExamples --> - <property> - <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.store.userName</name> - <value>root</value> - </property> - <property> - <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.store.password</name> - <value>password</value> - </property> - <property> - <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.store.databaseDriver</name> - <value>org.hsqldb.jdbcDriver</value> - </property> - <property> - <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.batchSize</name> - <value>5</value> - </property> - <property> - <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.port.outputPort.attr.TUPLE_CLASS</name> - <value>org.apache.apex.malhar.stream.sample.cookbook.InputPojo</value> - </property> - <property> - <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.store.databaseUrl</name> - <value>jdbc:hsqldb:mem:test</value> - </property> - <property> - <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.tableName</name> - <value>InputTable</value> - </property> - <property> - <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.query</name> - <value>SELECT * FROM InputTable;</value> - </property> - <property> - <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.store.userName</name> - <value>root</value> - </property> - <property> - <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.store.password</name> - <value>password</value> - </property> - <property> - <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.store.databaseDriver</name> - <value>org.hsqldb.jdbcDriver</value> - </property> - <property> - <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.batchSize</name> - <value>5</value> - </property> - <property> - <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.port.input.attr.TUPLE_CLASS</name> - <value>org.apache.apex.malhar.stream.sample.cookbook.OutputPojo</value> - </property> - <property> - <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.store.databaseUrl</name> - <value>jdbc:hsqldb:mem:test</value> - </property> - <property> - <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.tablename</name> - <value>OutputTable</value> - </property> - -</configuration> - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java deleted file mode 100644 index c078683..0000000 --- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.stream.sample; - -import java.util.concurrent.Callable; - -import org.junit.Assert; -import org.junit.Test; -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.LocalMode; -import com.datatorrent.stram.StramLocalCluster; - -/** - * Test for {@link MinimalWordCount}. - */ -public class MinimalWordCountTest -{ - @Test - public void MinimalWordCountTest() throws Exception - { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - conf.set("dt.application.MinimalWordCount.operator.console.silent", "true"); - MinimalWordCount app = new MinimalWordCount(); - - lma.prepareDAG(app, conf); - - LocalMode.Controller lc = lma.getController(); - ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() - { - @Override - public Boolean call() throws Exception - { - return MinimalWordCount.Collector.isDone(); - } - }); - - lc.run(10000); - - Assert.assertTrue(MinimalWordCount.Collector.result.get("error") == 7); - Assert.assertTrue(MinimalWordCount.Collector.result.get("word") == 119); - Assert.assertTrue(MinimalWordCount.Collector.result.get("bye") == 1); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java deleted file mode 100644 index f0c51f6..0000000 --- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.stream.sample; - -import java.util.Map; -import java.util.concurrent.Callable; - -import org.junit.Assert; -import org.junit.Test; - -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.LocalMode; -import com.datatorrent.lib.util.KeyValPair; -import com.datatorrent.stram.StramLocalCluster; - -/** - * Testing the TwitterAutoComplete Application. In order to run this test, you need to create an app - * at https://apps.twitter.com, then generate your consumer and access keys and tokens, and set the following properties - * for the application before running it: - * Your application consumer key, - * Your application consumer secret, - * Your twitter access token, and - * Your twitter access token secret. - */ -public class WindowedWordCountTest -{ - @Test - public void WindowedWordCountTest() throws Exception - { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - conf.set("dt.application.WindowedWordCount.operator.console.silent", "true"); - lma.prepareDAG(new WindowedWordCount(), conf); - LocalMode.Controller lc = lma.getController(); - ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() - { - @Override - public Boolean call() throws Exception - { - return WindowedWordCount.Collector.isDone(); - } - }); - - lc.run(60000); - - Assert.assertEquals(127, countSum(WindowedWordCount.Collector.getResult())); - Assert.assertEquals(28, countSumWord(WindowedWordCount.Collector.getResult(), "word2")); - Assert.assertEquals(7, countSumWord(WindowedWordCount.Collector.getResult(), "error")); - Assert.assertEquals(21, countSumWord(WindowedWordCount.Collector.getResult(), "word9")); - Assert.assertEquals(1, countSumWord(WindowedWordCount.Collector.getResult(), "bye")); - } - - public long countSum(Map<KeyValPair<Long, String>, Long> map) - { - long sum = 0; - for (long count : map.values()) { - sum += count; - } - return sum; - } - - public long countSumWord(Map<KeyValPair<Long, String>, Long> map, String word) - { - long sum = 0; - for (Map.Entry<KeyValPair<Long, String>, Long> entry : map.entrySet()) { - if (entry.getKey().getValue().equals(word)) { - sum += entry.getValue(); - } - } - return sum; - } - -} - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java deleted file mode 100644 index 4ed2d5d..0000000 --- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.stream.sample.complete; - -import java.util.concurrent.Callable; - -import org.junit.Assert; -import org.junit.Test; - -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.LocalMode; -import com.datatorrent.stram.StramLocalCluster; - -/** - * Testing the AutoComplete Application - */ -public class AutoCompleteTest -{ - - @Test - public void AutoCompleteTest() throws Exception - { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - conf.set("dt.application.AutoComplete.operator.console.silent", "true"); - lma.prepareDAG(new AutoComplete(), conf); - LocalMode.Controller lc = lma.getController(); - - ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() - { - @Override - public Boolean call() throws Exception - { - return AutoComplete.Collector.isDone(); - } - }); - - lc.run(200000); - - Assert.assertTrue(AutoComplete.Collector.getResult().containsKey("had")); - Assert.assertTrue(AutoComplete.Collector.getResult().containsKey("hadoop")); - Assert.assertEquals(2, AutoComplete.Collector.getResult().get("mapreduce").get(0).getCount()); - - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java deleted file mode 100644 index dc9cdec..0000000 --- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java +++ /dev/null @@ -1,144 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.stream.sample.complete; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.concurrent.Callable; - -import org.junit.After; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - -import org.apache.hadoop.conf.Configuration; - -import com.google.common.base.Throwables; -import com.datatorrent.api.LocalMode; -import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore; -import com.datatorrent.stram.StramLocalCluster; - -/** - * Testing StreamingWordExtract application - */ -public class StreamingWordExtractTest -{ - private static final String TUPLE_CLASS = "org.apache.apex.malhar.stream.sample.complete.PojoEvent"; - private static final String DB_DRIVER = "org.h2.Driver"; - private static final String DB_URL = "jdbc:h2:~/test"; - private static final String TABLE_NAME = "Test"; - private static final String USER_NAME = "root"; - private static final String PSW = "password"; - - @BeforeClass - public static void setup() - { - try { - Class.forName(DB_DRIVER).newInstance(); - - Connection con = DriverManager.getConnection(DB_URL,USER_NAME,PSW); - Statement stmt = con.createStatement(); - - String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( " - + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, " - + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, " - + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, " - + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", " - + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") " - + ")"; - stmt.executeUpdate(createMetaTable); - - String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME - + "(STRINGVALUE VARCHAR(255))"; - stmt.executeUpdate(createTable); - - } catch (Throwable e) { - throw Throwables.propagate(e); - } - } - - @After - public void cleanTable() - { - try { - Connection con = DriverManager.getConnection(DB_URL,USER_NAME,PSW); - Statement stmt = con.createStatement(); - String dropTable = "drop table " + TABLE_NAME; - stmt.executeUpdate(dropTable); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - public void setConfig(Configuration conf) - { - conf.set("dt.operator.jdbcOutput.prop.store.userName", USER_NAME); - conf.set("dt.operator.jdbcOutput.prop.store.password", PSW); - conf.set("dt.operator.jdbcOutput.prop.store.databaseDriver", DB_DRIVER); - conf.set("dt.operator.jdbcOutput.prop.batchSize", "5"); - conf.set("dt.operator.jdbcOutput.port.input.attr.TUPLE_CLASS", TUPLE_CLASS); - conf.set("dt.operator.jdbcOutput.prop.store.databaseUrl", DB_URL); - conf.set("dt.operator.jdbcOutput.prop.tablename", TABLE_NAME); - } - - public int getNumOfEventsInStore() - { - Connection con; - try { - con = DriverManager.getConnection(DB_URL,USER_NAME,PSW); - Statement stmt = con.createStatement(); - - String countQuery = "SELECT count(*) from " + TABLE_NAME; - ResultSet resultSet = stmt.executeQuery(countQuery); - resultSet.next(); - return resultSet.getInt(1); - } catch (SQLException e) { - throw new RuntimeException("fetching count", e); - } - } - - @Test - public void StreamingWordExtractTest() throws Exception - { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - setConfig(conf); - StreamingWordExtract app = new StreamingWordExtract(); - lma.prepareDAG(app, conf); - LocalMode.Controller lc = lma.getController(); - - ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() - { - @Override - public Boolean call() throws Exception - { - return getNumOfEventsInStore() == 36; - } - }); - - lc.run(10000); - - Assert.assertEquals(app.getWordCount(), getNumOfEventsInStore()); - Assert.assertEquals(app.getEntriesMapped(), getNumOfEventsInStore()); - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java deleted file mode 100644 index fddf511..0000000 --- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.stream.sample.complete; - -import java.util.List; -import java.util.concurrent.Callable; - -import org.junit.Assert; -import org.junit.Test; - -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.LocalMode; -import com.datatorrent.stram.StramLocalCluster; - -/** - * Testing the {@link TopWikipediaSessions} Application. - */ -public class TopWikipediaSessionsTest -{ - @Test - public void TopWikipediaSessionsTest() throws Exception - { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - conf.set("dt.application.TopWikipediaSessions.operator.console.silent", "true"); - lma.prepareDAG(new TopWikipediaSessions(), conf); - LocalMode.Controller lc = lma.getController(); - - ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() - { - @Override - public Boolean call() throws Exception - { - return TopWikipediaSessions.SessionGen.getTupleCount() >= 250; - } - }); - - lc.run(30000); - - for (int i = 0; i < TopWikipediaSessions.Collector.getResult().size(); i++) { - Assert.assertTrue(isInOrder(TopWikipediaSessions.Collector.getResult().get(i))); - } - } - - public boolean isInOrder(List<TopWikipediaSessions.TempWrapper> input) - { - if (input.size() == 0 || input.size() == 1) { - return true; - } - for (int i = 0; i < input.size() - 2; i++) { - if (input.get(i).getValue().getValue() < input.get(i + 1).getValue().getValue()) { - return false; - } - } - return true; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java deleted file mode 100644 index 766fa60..0000000 --- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.stream.sample.complete; - -import java.util.Map; -import java.util.concurrent.Callable; - -import org.junit.Assert; -import org.junit.Test; - -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.LocalMode; -import com.datatorrent.lib.util.KeyValPair; -import com.datatorrent.stram.StramLocalCluster; - -/** - * Testing the {@link TrafficRoutes} Application. - */ -public class TrafficRoutesTest -{ - - @Test - public void TrafficRoutesTest() throws Exception - { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - conf.set("dt.application.TrafficRoutes.operator.console.silent", "true"); - lma.prepareDAG(new TrafficRoutes(), conf); - LocalMode.Controller lc = lma.getController(); - - ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() - { - @Override - public Boolean call() throws Exception - { - return TrafficRoutes.InfoGen.getTupleCount() >= 100; - } - }); - - lc.run(60000); - - Assert.assertTrue(!TrafficRoutes.Collector.getResult().isEmpty()); - for (Map.Entry<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> entry : TrafficRoutes.Collector.getResult().entrySet()) { - Assert.assertTrue(entry.getValue().getKey() <= 75); - Assert.assertTrue(entry.getValue().getKey() >= 55); - Assert.assertTrue(entry.getKey().getValue().equals("SDRoute1") || entry.getKey().getValue().equals("SDRoute2")); - } - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoCompleteTest.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoCompleteTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoCompleteTest.java deleted file mode 100644 index 9ba2f25..0000000 --- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoCompleteTest.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.stream.sample.complete; - -import org.junit.Ignore; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.LocalMode; - -/** - * Testing the TwitterAutoComplete Application. In order to run this test, you need to create an app - * at https://apps.twitter.com, then generate your consumer and access keys and tokens, and set the following properties - * for the application before running it: - * Your application consumer key, - * Your application consumer secret, - * Your twitter access token, and - * Your twitter access token secret. - * - * This test is mainly for local demonstration purpose. Default time to run the application is 1 minute, please - * set the time you need to run the application before you run. - */ -public class TwitterAutoCompleteTest -{ - private static final Logger logger = LoggerFactory.getLogger(org.apache.apex.malhar.stream.sample.complete.AutoCompleteTest.class); - - @Test - @Ignore - public void TwitterAutoCompleteTest() throws Exception - { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - //uncomment the following lines and change YOUR_XXX to the corresponding information needed. - //conf.set("dt.application.TwitterAutoComplete.operator.tweetSampler.consumerKey", "YOUR_CONSUMERKEY"); - //conf.set("dt.application.TwitterAutoComplete.operator.tweetSampler.consumerSecret", "YOUR_CONSUERSECRET"); - //conf.set("dt.application.TwitterAutoComplete.operator.tweetSampler.accessToken", "YOUR_ACCESSTOKEN"); - //conf.set("dt.application.TwitterAutoComplete.operator.tweetSampler.accessTokenSecret", "YOUR_TOKENSECRET"); - lma.prepareDAG(new TwitterAutoComplete(), conf); - LocalMode.Controller lc = lma.getController(); - long start = System.currentTimeMillis(); - lc.run(60000); // Set your desired time to run the application here. - long end = System.currentTimeMillis(); - long time = end - start; - logger.info("Test used " + time + " ms"); - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java deleted file mode 100644 index 1e14fff..0000000 --- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.stream.sample.cookbook; - -import java.util.concurrent.Callable; -import org.junit.Assert; -import org.junit.Test; -import org.apache.hadoop.conf.Configuration; -import com.datatorrent.api.LocalMode; -import com.datatorrent.stram.StramLocalCluster; - -/** - * Test for {@link CombinePerKeyExamples}. - */ -public class CombinePerKeyExamplesTest -{ - @Test - public void CombinePerKeyExamplesTest() throws Exception - { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - conf.set("dt.application.CombinePerKeyExamples.operator.console.silent", "true"); - CombinePerKeyExamples app = new CombinePerKeyExamples(); - - lma.prepareDAG(app, conf); - - LocalMode.Controller lc = lma.getController(); - ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() - { - @Override - public Boolean call() throws Exception - { - return CombinePerKeyExamples.Collector.isDone(); - } - }); - lc.run(100000); - - Assert.assertTrue(CombinePerKeyExamples.Collector.getResult().get(CombinePerKeyExamples.Collector.getResult().size() - 2).getCorpus().contains("1, 2, 3, 4, 5, 6, 7, 8")); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java deleted file mode 100644 index 7f93f50..0000000 --- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.stream.sample.cookbook; - -import java.util.concurrent.Callable; - -import org.junit.Assert; -import org.junit.Test; -import org.apache.hadoop.conf.Configuration; -import com.datatorrent.api.LocalMode; - -import com.datatorrent.stram.StramLocalCluster; - - -/** - * Test for {@link DeDupExample}. - */ -public class DeDupExampleTest -{ - @Test - public void DeDupExampleTest() throws Exception - { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - conf.set("dt.application.DeDupExample.operator.console.silent", "true"); - DeDupExample app = new DeDupExample(); - lma.prepareDAG(app, conf); - LocalMode.Controller lc = lma.getController(); - ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() - { - @Override - public Boolean call() throws Exception - { - return DeDupExample.Collector.isDone(); - } - }); - lc.run(50000); - - Assert.assertEquals(9, DeDupExample.Collector.getResult().getValue().size()); - - } - -}
