http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java ---------------------------------------------------------------------- diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java new file mode 100644 index 0000000..9fd9495 --- /dev/null +++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.cookbook; + +import java.util.List; + +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.lib.window.accumulation.Max; +import org.apache.apex.malhar.stream.api.ApexStream; +import org.apache.apex.malhar.stream.api.CompositeStreamTransform; +import org.apache.apex.malhar.stream.api.WindowedStream; +import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.StreamFactory; +import org.apache.hadoop.conf.Configuration; + +import static java.sql.Types.DOUBLE; +import static java.sql.Types.INTEGER; + +import com.google.common.collect.Lists; +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.db.jdbc.JdbcFieldInfo; +import com.datatorrent.lib.db.jdbc.JdbcPOJOInputOperator; +import com.datatorrent.lib.db.jdbc.JdbcPOJOInsertOutputOperator; +import com.datatorrent.lib.db.jdbc.JdbcStore; +import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore; +import com.datatorrent.lib.util.FieldInfo; +import com.datatorrent.lib.util.KeyValPair; + +import static org.apache.apex.malhar.stream.api.Option.Options.name; + +/** + * MaxPerKeyExamples Application from Beam + * + * @since 3.5.0 + */ +@ApplicationAnnotation(name = "MaxPerKeyExamples") +public class MaxPerKeyExamples implements StreamingApplication +{ + + /** + * A map function to extract the mean temperature from {@link InputPojo}. + */ + public static class ExtractTempFn implements Function.MapFunction<InputPojo, KeyValPair<Integer, Double>> + { + @Override + public KeyValPair<Integer, Double> f(InputPojo row) + { + Integer month = row.getMonth(); + Double meanTemp = row.getMeanTemp(); + return new KeyValPair<Integer, Double>(month, meanTemp); + } + } + + + /** + * A map function to format output to {@link OutputPojo}. + */ + public static class FormatMaxesFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<Integer, Double>>, OutputPojo> + { + @Override + public OutputPojo f(Tuple.WindowedTuple<KeyValPair<Integer, Double>> input) + { + OutputPojo row = new OutputPojo(); + row.setMonth(input.getValue().getKey()); + row.setMeanTemp(input.getValue().getValue()); + return row; + } + } + + /** + * A composite transformation to perform three tasks: + * 1. extract the month and its mean temperature from input pojo. + * 2. find the maximum mean temperature for every month. + * 3. format the result to a output pojo object. + */ + public static class MaxMeanTemp extends CompositeStreamTransform<WindowedStream<InputPojo>, WindowedStream<OutputPojo>> + { + @Override + public WindowedStream<OutputPojo> compose(WindowedStream<InputPojo> rows) + { + // InputPojo... => <month, meanTemp> ... + WindowedStream<KeyValPair<Integer, Double>> temps = rows.map(new ExtractTempFn(), name("ExtractTempFn")); + + // month, meanTemp... => <month, max mean temp>... + WindowedStream<Tuple.WindowedTuple<KeyValPair<Integer, Double>>> tempMaxes = + temps.accumulateByKey(new Max<Double>(), + new Function.ToKeyValue<KeyValPair<Integer, Double>, Integer, Double>() + { + @Override + public Tuple<KeyValPair<Integer, Double>> f(KeyValPair<Integer, Double> input) + { + return new Tuple.WindowedTuple<KeyValPair<Integer, Double>>(Window.GlobalWindow.INSTANCE, input); + } + }, name("MaxPerMonth")); + + // <month, max>... => OutputPojo... + WindowedStream<OutputPojo> results = tempMaxes.map(new FormatMaxesFn(), name("FormatMaxesFn")); + + return results; + } + } + + /** + * Method to set field info for {@link JdbcPOJOInputOperator}. + * @return + */ + private List<FieldInfo> addInputFieldInfos() + { + List<FieldInfo> fieldInfos = Lists.newArrayList(); + fieldInfos.add(new FieldInfo("MONTH", "month", FieldInfo.SupportType.INTEGER)); + fieldInfos.add(new FieldInfo("DAY", "day", FieldInfo.SupportType.INTEGER)); + fieldInfos.add(new FieldInfo("YEAR", "year", FieldInfo.SupportType.INTEGER)); + fieldInfos.add(new FieldInfo("MEANTEMP", "meanTemp", FieldInfo.SupportType.DOUBLE)); + return fieldInfos; + } + + /** + * Method to set field info for {@link JdbcPOJOInsertOutputOperator}. + * @return + */ + private List<JdbcFieldInfo> addOutputFieldInfos() + { + List<JdbcFieldInfo> fieldInfos = Lists.newArrayList(); + fieldInfos.add(new JdbcFieldInfo("MONTH", "month", JdbcFieldInfo.SupportType.INTEGER, INTEGER)); + fieldInfos.add(new JdbcFieldInfo("MEANTEMP", "meanTemp", JdbcFieldInfo.SupportType.DOUBLE, DOUBLE)); + return fieldInfos; + } + + + /** + * Populate the dag using High-Level API. + * @param dag + * @param conf + */ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + JdbcPOJOInputOperator jdbcInput = new JdbcPOJOInputOperator(); + jdbcInput.setFieldInfos(addInputFieldInfos()); + + JdbcStore store = new JdbcStore(); + jdbcInput.setStore(store); + + JdbcPOJOInsertOutputOperator jdbcOutput = new JdbcPOJOInsertOutputOperator(); + jdbcOutput.setFieldInfos(addOutputFieldInfos()); + JdbcTransactionalStore outputStore = new JdbcTransactionalStore(); + jdbcOutput.setStore(outputStore); + + // Create stream that reads from a Jdbc Input. + ApexStream<Object> stream = StreamFactory.fromInput(jdbcInput, jdbcInput.outputPort, name("jdbcInput")) + + // Apply window and trigger option to the stream. + .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1)) + + // Because Jdbc Input sends out a stream of Object, need to cast them to InputPojo. + .map(new Function.MapFunction<Object, InputPojo>() + { + @Override + public InputPojo f(Object input) + { + return (InputPojo)input; + } + }, name("ObjectToInputPojo")) + + // Plug in the composite transformation to the stream to calculate the maximum temperature for each month. + .addCompositeStreams(new MaxMeanTemp()) + + // Cast the resulted OutputPojo to Object for Jdbc Output to consume. + .map(new Function.MapFunction<OutputPojo, Object>() + { + @Override + public Object f(OutputPojo input) + { + return (Object)input; + } + }, name("OutputPojoToObject")) + + // Output the result to Jdbc Output. + .endWith(jdbcOutput, jdbcOutput.input, name("jdbcOutput")); + + stream.populateDag(dag); + + } +}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java ---------------------------------------------------------------------- diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java new file mode 100644 index 0000000..f3d0c64 --- /dev/null +++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.cookbook; + +/** + * OutputPojo Tuple Class for jdbcOutput of {@link MaxPerKeyExamples}. + * + * @since 3.5.0 + */ +public class OutputPojo +{ + private int month; + private double meanTemp; + + @Override + public String toString() + { + return "PojoEvent [month=" + getMonth() + ", meanTemp=" + getMeanTemp() + "]"; + } + + public void setMonth(int month) + { + this.month = month; + } + + public int getMonth() + { + return this.month; + } + + public void setMeanTemp(double meanTemp) + { + this.meanTemp = meanTemp; + } + + public double getMeanTemp() + { + return meanTemp; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java ---------------------------------------------------------------------- diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java new file mode 100644 index 0000000..962faa5 --- /dev/null +++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java @@ -0,0 +1,577 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.cookbook; + +import java.util.Date; +import java.util.Objects; + +import org.joda.time.Duration; + +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.stream.api.ApexStream; +import org.apache.apex.malhar.stream.api.CompositeStreamTransform; +import org.apache.apex.malhar.stream.api.WindowedStream; +import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.StreamFactory; + +import com.datatorrent.lib.util.KeyValPair; + +/** + * This example illustrates the basic concepts behind triggering. It shows how to use different + * trigger definitions to produce partial (speculative) results before all the data is processed and + * to control when updated results are produced for late data. The example performs a streaming + * analysis of the data coming in from PubSub and writes the results to BigQuery. It divides the + * data into {@link Window windows} to be processed, and demonstrates using various kinds of + * {@link org.apache.beam.sdk.transforms.windowing.Trigger triggers} to control when the results for + * each window are emitted. + * + * <p> This example uses a portion of real traffic data from San Diego freeways. It contains + * readings from sensor stations set up along each freeway. Each sensor reading includes a + * calculation of the 'total flow' across all lanes in that freeway direction. + * + * <p> Concepts: + * <pre> + * 1. The default triggering behavior + * 2. Late data with the default trigger + * 3. How to get speculative estimates + * 4. Combining late data and speculative estimates + * </pre> + * + * <p> Before running this example, it will be useful to familiarize yourself with Dataflow triggers + * and understand the concept of 'late data', + * See: <a href="https://cloud.google.com/dataflow/model/triggers"> + * https://cloud.google.com/dataflow/model/triggers </a> and + * <a href="https://cloud.google.com/dataflow/model/windowing#Advanced"> + * https://cloud.google.com/dataflow/model/windowing#Advanced </a> + * + * <p> The example pipeline reads data from a Pub/Sub topic. By default, running the example will + * also run an auxiliary pipeline to inject data from the default {@code --input} file to the + * {@code --pubsubTopic}. The auxiliary pipeline puts a timestamp on the injected data so that the + * example pipeline can operate on <i>event time</i> (rather than arrival time). The auxiliary + * pipeline also randomly simulates late data, by setting the timestamps of some of the data + * elements to be in the past. You may override the default {@code --input} with the file of your + * choosing or set {@code --input=""} which will disable the automatic Pub/Sub injection, and allow + * you to use a separate tool to publish to the given topic. + * + * <p> The example is configured to use the default Pub/Sub topic and the default BigQuery table + * from the example common package (there are no defaults for a general Dataflow pipeline). + * You can override them by using the {@code --pubsubTopic}, {@code --bigQueryDataset}, and + * {@code --bigQueryTable} options. If the Pub/Sub topic or the BigQuery table do not exist, + * the example will try to create them. + * + * <p> The pipeline outputs its results to a BigQuery table. + * Here are some queries you can use to see interesting results: + * Replace {@code <enter_table_name>} in the query below with the name of the BigQuery table. + * Replace {@code <enter_window_interval>} in the query below with the window interval. + * + * <p> To see the results of the default trigger, + * Note: When you start up your pipeline, you'll initially see results from 'late' data. Wait after + * the window duration, until the first pane of non-late data has been emitted, to see more + * interesting results. + * {@code SELECT * FROM enter_table_name WHERE triggerType = "default" ORDER BY window DESC} + * + * <p> To see the late data i.e. dropped by the default trigger, + * {@code SELECT * FROM <enter_table_name> WHERE triggerType = "withAllowedLateness" and + * (timing = "LATE" or timing = "ON_TIME") and freeway = "5" ORDER BY window DESC, processingTime} + * + * <p>To see the the difference between accumulation mode and discarding mode, + * {@code SELECT * FROM <enter_table_name> WHERE (timing = "LATE" or timing = "ON_TIME") AND + * (triggerType = "withAllowedLateness" or triggerType = "sequential") and freeway = "5" ORDER BY + * window DESC, processingTime} + * + * <p> To see speculative results every minute, + * {@code SELECT * FROM <enter_table_name> WHERE triggerType = "speculative" and freeway = "5" + * ORDER BY window DESC, processingTime} + * + * <p> To see speculative results every five minutes after the end of the window + * {@code SELECT * FROM <enter_table_name> WHERE triggerType = "sequential" and timing != "EARLY" + * and freeway = "5" ORDER BY window DESC, processingTime} + * + * <p> To see the first and the last pane for a freeway in a window for all the trigger types, + * {@code SELECT * FROM <enter_table_name> WHERE (isFirst = true or isLast = true) ORDER BY window} + * + * <p> To reduce the number of results for each query we can add additional where clauses. + * For examples, To see the results of the default trigger, + * {@code SELECT * FROM <enter_table_name> WHERE triggerType = "default" AND freeway = "5" AND + * window = "<enter_window_interval>"} + * + * <p> The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C) + * and then exits. + * + * @since 3.5.0 + */ + +public class TriggerExample +{ + //Numeric value of fixed window duration, in minutes + public static final int WINDOW_DURATION = 30; + // Constants used in triggers. + // Speeding up ONE_MINUTE or FIVE_MINUTES helps you get an early approximation of results. + // ONE_MINUTE is used only with processing time before the end of the window + public static final Duration ONE_MINUTE = Duration.standardMinutes(1); + // FIVE_MINUTES is used only with processing time after the end of the window + public static final Duration FIVE_MINUTES = Duration.standardMinutes(5); + // ONE_DAY is used to specify the amount of lateness allowed for the data elements. + public static final Duration ONE_DAY = Duration.standardDays(1); + + /** + * This transform demonstrates using triggers to control when data is produced for each window + * Consider an example to understand the results generated by each type of trigger. + * The example uses "freeway" as the key. Event time is the timestamp associated with the data + * element and processing time is the time when the data element gets processed in the pipeline. + * For freeway 5, suppose there are 10 elements in the [10:00:00, 10:30:00) window. + * Key (freeway) | Value (totalFlow) | event time | processing time + * 5 | 50 | 10:00:03 | 10:00:47 + * 5 | 30 | 10:01:00 | 10:01:03 + * 5 | 30 | 10:02:00 | 11:07:00 + * 5 | 20 | 10:04:10 | 10:05:15 + * 5 | 60 | 10:05:00 | 11:03:00 + * 5 | 20 | 10:05:01 | 11.07:30 + * 5 | 60 | 10:15:00 | 10:27:15 + * 5 | 40 | 10:26:40 | 10:26:43 + * 5 | 60 | 10:27:20 | 10:27:25 + * 5 | 60 | 10:29:00 | 11:11:00 + * + * <p> Dataflow tracks a watermark which records up to what point in event time the data is + * complete. For the purposes of the example, we'll assume the watermark is approximately 15m + * behind the current processing time. In practice, the actual value would vary over time based + * on the systems knowledge of the current PubSub delay and contents of the backlog (data + * that has not yet been processed). + * + * <p> If the watermark is 15m behind, then the window [10:00:00, 10:30:00) (in event time) would + * close at 10:44:59, when the watermark passes 10:30:00. + */ + static class CalculateTotalFlow + extends CompositeStreamTransform<ApexStream<String>, WindowedStream<SampleBean>> + { + private int windowDuration; + + CalculateTotalFlow(int windowDuration) + { + this.windowDuration = windowDuration; + } + + @Override + public WindowedStream<SampleBean> compose(ApexStream<String> inputStream) + { + // Concept #1: The default triggering behavior + // By default Dataflow uses a trigger which fires when the watermark has passed the end of the + // window. This would be written {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}. + + // The system also defaults to dropping late data -- data which arrives after the watermark + // has passed the event timestamp of the arriving element. This means that the default trigger + // will only fire once. + + // Each pane produced by the default trigger with no allowed lateness will be the first and + // last pane in the window, and will be ON_TIME. + + // The results for the example above with the default trigger and zero allowed lateness + // would be: + // Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing + // 5 | 260 | 6 | true | true | ON_TIME + + // At 11:03:00 (processing time) the system watermark may have advanced to 10:54:00. As a + // result, when the data record with event time 10:05:00 arrives at 11:03:00, it is considered + // late, and dropped. + + WindowedStream<SampleBean> defaultTriggerResults = inputStream + .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)), + new TriggerOption().discardingFiredPanes()) + .addCompositeStreams(new TotalFlow("default")); + + // Concept #2: Late data with the default trigger + // This uses the same trigger as concept #1, but allows data that is up to ONE_DAY late. This + // leads to each window staying open for ONE_DAY after the watermark has passed the end of the + // window. Any late data will result in an additional pane being fired for that same window. + + // The first pane produced will be ON_TIME and the remaining panes will be LATE. + // To definitely get the last pane when the window closes, use + // .withAllowedLateness(ONE_DAY, ClosingBehavior.FIRE_ALWAYS). + + // The results for the example above with the default trigger and ONE_DAY allowed lateness + // would be: + // Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing + // 5 | 260 | 6 | true | false | ON_TIME + // 5 | 60 | 1 | false | false | LATE + // 5 | 30 | 1 | false | false | LATE + // 5 | 20 | 1 | false | false | LATE + // 5 | 60 | 1 | false | false | LATE + WindowedStream<SampleBean> withAllowedLatenessResults = inputStream + .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)), + new TriggerOption().discardingFiredPanes(), + Duration.standardDays(1)) + .addCompositeStreams(new TotalFlow("withAllowedLateness")); + + // Concept #3: How to get speculative estimates + // We can specify a trigger that fires independent of the watermark, for instance after + // ONE_MINUTE of processing time. This allows us to produce speculative estimates before + // all the data is available. Since we don't have any triggers that depend on the watermark + // we don't get an ON_TIME firing. Instead, all panes are either EARLY or LATE. + + // We also use accumulatingFiredPanes to build up the results across each pane firing. + + // The results for the example above for this trigger would be: + // Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing + // 5 | 80 | 2 | true | false | EARLY + // 5 | 100 | 3 | false | false | EARLY + // 5 | 260 | 6 | false | false | EARLY + // 5 | 320 | 7 | false | false | LATE + // 5 | 370 | 9 | false | false | LATE + // 5 | 430 | 10 | false | false | LATE + + ApexStream<SampleBean> speculativeResults = inputStream + .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)), + //Trigger fires every minute + new TriggerOption().withEarlyFiringsAtEvery(Duration.standardMinutes(1)) + // After emitting each pane, it will continue accumulating the elements so that each + // approximation includes all of the previous data in addition to the newly arrived + // data. + .accumulatingFiredPanes(), + Duration.standardDays(1)) + .addCompositeStreams(new TotalFlow("speculative")); + + // Concept #4: Combining late data and speculative estimates + // We can put the previous concepts together to get EARLY estimates, an ON_TIME result, + // and LATE updates based on late data. + + // Each time a triggering condition is satisfied it advances to the next trigger. + // If there are new elements this trigger emits a window under following condition: + // > Early approximations every minute till the end of the window. + // > An on-time firing when the watermark has passed the end of the window + // > Every five minutes of late data. + + // Every pane produced will either be EARLY, ON_TIME or LATE. + + // The results for the example above for this trigger would be: + // Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing + // 5 | 80 | 2 | true | false | EARLY + // 5 | 100 | 3 | false | false | EARLY + // 5 | 260 | 6 | false | false | EARLY + // [First pane fired after the end of the window] + // 5 | 320 | 7 | false | false | ON_TIME + // 5 | 430 | 10 | false | false | LATE + + // For more possibilities of how to build advanced triggers, see {@link Trigger}. + WindowedStream<SampleBean> sequentialResults = inputStream + .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)), + // Speculative every ONE_MINUTE + new TriggerOption().withEarlyFiringsAtEvery(Duration.standardMinutes(1)) + .withLateFiringsAtEvery(Duration.standardMinutes(5)) + // After emitting each pane, it will continue accumulating the elements so that each + // approximation includes all of the previous data in addition to the newly arrived + // data. + .accumulatingFiredPanes(), + Duration.standardDays(1)) + .addCompositeStreams(new TotalFlow("sequential")); + + return sequentialResults; + } + + } + + ////////////////////////////////////////////////////////////////////////////////////////////////// + // The remaining parts of the pipeline are needed to produce the output for each + // concept above. Not directly relevant to understanding the trigger examples. + + /** + * Calculate total flow and number of records for each freeway and format the results to TableRow + * objects, to save to BigQuery. + */ + static class TotalFlow extends + CompositeStreamTransform<WindowedStream<String>, WindowedStream<SampleBean>> + { + private String triggerType; + + public TotalFlow(String triggerType) + { + this.triggerType = triggerType; + } + + @Override + public WindowedStream<SampleBean> compose(WindowedStream<String> inputStream) + { + + WindowedStream<KeyValPair<String, Iterable<Integer>>> flowPerFreeway = inputStream + .groupByKey(new ExtractFlowInfo()); + + return flowPerFreeway + .map(new Function.MapFunction<KeyValPair<String, Iterable<Integer>>, KeyValPair<String, String>>() + { + @Override + public KeyValPair<String, String> f(KeyValPair<String, Iterable<Integer>> input) + { + Iterable<Integer> flows = input.getValue(); + Integer sum = 0; + Long numberOfRecords = 0L; + for (Integer value : flows) { + sum += value; + numberOfRecords++; + } + return new KeyValPair<>(input.getKey(), sum + "," + numberOfRecords); + } + }) + .map(new FormatTotalFlow(triggerType)); + } + } + + /** + * Format the results of the Total flow calculation to a TableRow, to save to BigQuery. + * Adds the triggerType, pane information, processing time and the window timestamp. + */ + static class FormatTotalFlow implements Function.MapFunction<KeyValPair<String, String>, SampleBean> + { + private String triggerType; + + public FormatTotalFlow(String triggerType) + { + this.triggerType = triggerType; + } + + @Override + public SampleBean f(KeyValPair<String, String> input) + { + String[] values = input.getValue().split(","); + //TODO need to have a callback to get the metadata like window id, pane id, timestamps etc. + return new SampleBean(triggerType, input.getKey(), Integer.parseInt(values[0]), Long + .parseLong(values[1]), null, false, false, null, null, new Date()); + } + } + + public static class SampleBean + { + public SampleBean() + { + } + + private String triggerType; + + private String freeway; + + private int totalFlow; + + private long numberOfRecords; + + private String window; + + private boolean isFirst; + + private boolean isLast; + + private Date timing; + + private Date eventTime; + + private Date processingTime; + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SampleBean that = (SampleBean)o; + return totalFlow == that.totalFlow && + numberOfRecords == that.numberOfRecords && + isFirst == that.isFirst && + isLast == that.isLast && + Objects.equals(triggerType, that.triggerType) && + Objects.equals(freeway, that.freeway) && + Objects.equals(window, that.window) && + Objects.equals(timing, that.timing) && + Objects.equals(eventTime, that.eventTime) && + Objects.equals(processingTime, that.processingTime); + } + + @Override + public int hashCode() + { + return Objects + .hash(triggerType, freeway, totalFlow, numberOfRecords, window, isFirst, isLast, timing, eventTime, + processingTime); + } + + public SampleBean(String triggerType, String freeway, int totalFlow, long numberOfRecords, String window, boolean isFirst, boolean isLast, Date timing, Date eventTime, Date processingTime) + { + + this.triggerType = triggerType; + this.freeway = freeway; + this.totalFlow = totalFlow; + this.numberOfRecords = numberOfRecords; + this.window = window; + this.isFirst = isFirst; + this.isLast = isLast; + this.timing = timing; + this.eventTime = eventTime; + this.processingTime = processingTime; + } + + public String getTriggerType() + { + return triggerType; + } + + public void setTriggerType(String triggerType) + { + this.triggerType = triggerType; + } + + public String getFreeway() + { + return freeway; + } + + public void setFreeway(String freeway) + { + this.freeway = freeway; + } + + public int getTotalFlow() + { + return totalFlow; + } + + public void setTotalFlow(int totalFlow) + { + this.totalFlow = totalFlow; + } + + public long getNumberOfRecords() + { + return numberOfRecords; + } + + public void setNumberOfRecords(long numberOfRecords) + { + this.numberOfRecords = numberOfRecords; + } + + public String getWindow() + { + return window; + } + + public void setWindow(String window) + { + this.window = window; + } + + public boolean isFirst() + { + return isFirst; + } + + public void setFirst(boolean first) + { + isFirst = first; + } + + public boolean isLast() + { + return isLast; + } + + public void setLast(boolean last) + { + isLast = last; + } + + public Date getTiming() + { + return timing; + } + + public void setTiming(Date timing) + { + this.timing = timing; + } + + public Date getEventTime() + { + return eventTime; + } + + public void setEventTime(Date eventTime) + { + this.eventTime = eventTime; + } + + public Date getProcessingTime() + { + return processingTime; + } + + public void setProcessingTime(Date processingTime) + { + this.processingTime = processingTime; + } + } + + /** + * Extract the freeway and total flow in a reading. + * Freeway is used as key since we are calculating the total flow for each freeway. + */ + static class ExtractFlowInfo implements Function.ToKeyValue<String, String, Integer> + { + @Override + public Tuple<KeyValPair<String, Integer>> f(String input) + { + String[] laneInfo = input.split(","); + if (laneInfo[0].equals("timestamp")) { + // Header row + return null; + } + if (laneInfo.length < 48) { + //Skip the invalid input. + return null; + } + String freeway = laneInfo[2]; + Integer totalFlow = tryIntegerParse(laneInfo[7]); + // Ignore the records with total flow 0 to easily understand the working of triggers. + // Skip the records with total flow -1 since they are invalid input. + if (totalFlow == null || totalFlow <= 0) { + return null; + } + return new Tuple.PlainTuple<>(new KeyValPair<>(freeway, totalFlow)); + } + } + + private static final String PUBSUB_TIMESTAMP_LABEL_KEY = "timestamp_ms"; + + public static void main(String[] args) throws Exception + { + StreamFactory.fromFolder("some folder") + .addCompositeStreams(new CalculateTotalFlow(60)); + + } + + private static Integer tryIntegerParse(String number) + { + try { + return Integer.parseInt(number); + } catch (NumberFormatException e) { + return null; + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/examples/highlevelapi/src/main/resources/META-INF/properties.xml b/examples/highlevelapi/src/main/resources/META-INF/properties.xml new file mode 100644 index 0000000..ead0460 --- /dev/null +++ b/examples/highlevelapi/src/main/resources/META-INF/properties.xml @@ -0,0 +1,141 @@ +<?xml version="1.0"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<configuration> + <!-- + <property> + <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name> + <value>some-default-value (if value is not specified, it is required from the user or custom config when launching)</value> + </property> + --> + + <!-- Properties for TwitterAutoComplete, please fill out all of them to make the application work --> + <property> + <name>dt.application.TwitterAutoComplete.operator.tweetSampler.consumerKey</name> + <value></value> + </property> + <property> + <name>dt.application.TwitterAutoComplete.operator.tweetSampler.consumerSecret</name> + <value></value> + </property> + <property> + <name>dt.application.TwitterAutoComplete.operator.tweetSampler.accessToken</name> + <value></value> + </property> + <property> + <name>dt.application.TwitterAutoComplete.operator.tweetSampler.accessTokenSecret</name> + <value></value> + </property> + + <!-- Properties for StreamingWordExtract --> + <property> + <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.store.userName</name> + <value>root</value> + </property> + <property> + <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.store.password</name> + <value>password</value> + </property> + <property> + <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.store.databaseDriver</name> + <value>org.hsqldb.jdbcDriver</value> + </property> + <property> + <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.batchSize</name> + <value>5</value> + </property> + <property> + <name>dt.application.StreamingWordExtract.operator.jdbcOutput.port.input.attr.TUPLE_CLASS</name> + <value>org.apache.apex.malhar.stream.sample.complete.PojoEvent</value> + </property> + <property> + <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.store.databaseUrl</name> + <value>jdbc:hsqldb:mem:test</value> + </property> + <property> + <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.tablename</name> + <value>Test</value> + </property> + + <!-- Properties for MaxPerKeyExamples --> + <property> + <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.store.userName</name> + <value>root</value> + </property> + <property> + <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.store.password</name> + <value>password</value> + </property> + <property> + <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.store.databaseDriver</name> + <value>org.hsqldb.jdbcDriver</value> + </property> + <property> + <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.batchSize</name> + <value>5</value> + </property> + <property> + <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.port.outputPort.attr.TUPLE_CLASS</name> + <value>org.apache.apex.malhar.stream.sample.cookbook.InputPojo</value> + </property> + <property> + <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.store.databaseUrl</name> + <value>jdbc:hsqldb:mem:test</value> + </property> + <property> + <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.tableName</name> + <value>InputTable</value> + </property> + <property> + <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.query</name> + <value>SELECT * FROM InputTable;</value> + </property> + <property> + <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.store.userName</name> + <value>root</value> + </property> + <property> + <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.store.password</name> + <value>password</value> + </property> + <property> + <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.store.databaseDriver</name> + <value>org.hsqldb.jdbcDriver</value> + </property> + <property> + <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.batchSize</name> + <value>5</value> + </property> + <property> + <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.port.input.attr.TUPLE_CLASS</name> + <value>org.apache.apex.malhar.stream.sample.cookbook.OutputPojo</value> + </property> + <property> + <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.store.databaseUrl</name> + <value>jdbc:hsqldb:mem:test</value> + </property> + <property> + <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.tablename</name> + <value>OutputTable</value> + </property> + +</configuration> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java ---------------------------------------------------------------------- diff --git a/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java new file mode 100644 index 0000000..c078683 --- /dev/null +++ b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample; + +import java.util.concurrent.Callable; + +import org.junit.Assert; +import org.junit.Test; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; +import com.datatorrent.stram.StramLocalCluster; + +/** + * Test for {@link MinimalWordCount}. + */ +public class MinimalWordCountTest +{ + @Test + public void MinimalWordCountTest() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.set("dt.application.MinimalWordCount.operator.console.silent", "true"); + MinimalWordCount app = new MinimalWordCount(); + + lma.prepareDAG(app, conf); + + LocalMode.Controller lc = lma.getController(); + ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() + { + @Override + public Boolean call() throws Exception + { + return MinimalWordCount.Collector.isDone(); + } + }); + + lc.run(10000); + + Assert.assertTrue(MinimalWordCount.Collector.result.get("error") == 7); + Assert.assertTrue(MinimalWordCount.Collector.result.get("word") == 119); + Assert.assertTrue(MinimalWordCount.Collector.result.get("bye") == 1); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java ---------------------------------------------------------------------- diff --git a/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java new file mode 100644 index 0000000..f0c51f6 --- /dev/null +++ b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample; + +import java.util.Map; +import java.util.concurrent.Callable; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; +import com.datatorrent.lib.util.KeyValPair; +import com.datatorrent.stram.StramLocalCluster; + +/** + * Testing the TwitterAutoComplete Application. In order to run this test, you need to create an app + * at https://apps.twitter.com, then generate your consumer and access keys and tokens, and set the following properties + * for the application before running it: + * Your application consumer key, + * Your application consumer secret, + * Your twitter access token, and + * Your twitter access token secret. + */ +public class WindowedWordCountTest +{ + @Test + public void WindowedWordCountTest() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.set("dt.application.WindowedWordCount.operator.console.silent", "true"); + lma.prepareDAG(new WindowedWordCount(), conf); + LocalMode.Controller lc = lma.getController(); + ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() + { + @Override + public Boolean call() throws Exception + { + return WindowedWordCount.Collector.isDone(); + } + }); + + lc.run(60000); + + Assert.assertEquals(127, countSum(WindowedWordCount.Collector.getResult())); + Assert.assertEquals(28, countSumWord(WindowedWordCount.Collector.getResult(), "word2")); + Assert.assertEquals(7, countSumWord(WindowedWordCount.Collector.getResult(), "error")); + Assert.assertEquals(21, countSumWord(WindowedWordCount.Collector.getResult(), "word9")); + Assert.assertEquals(1, countSumWord(WindowedWordCount.Collector.getResult(), "bye")); + } + + public long countSum(Map<KeyValPair<Long, String>, Long> map) + { + long sum = 0; + for (long count : map.values()) { + sum += count; + } + return sum; + } + + public long countSumWord(Map<KeyValPair<Long, String>, Long> map, String word) + { + long sum = 0; + for (Map.Entry<KeyValPair<Long, String>, Long> entry : map.entrySet()) { + if (entry.getKey().getValue().equals(word)) { + sum += entry.getValue(); + } + } + return sum; + } + +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java ---------------------------------------------------------------------- diff --git a/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java new file mode 100644 index 0000000..4ed2d5d --- /dev/null +++ b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.complete; + +import java.util.concurrent.Callable; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; +import com.datatorrent.stram.StramLocalCluster; + +/** + * Testing the AutoComplete Application + */ +public class AutoCompleteTest +{ + + @Test + public void AutoCompleteTest() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.set("dt.application.AutoComplete.operator.console.silent", "true"); + lma.prepareDAG(new AutoComplete(), conf); + LocalMode.Controller lc = lma.getController(); + + ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() + { + @Override + public Boolean call() throws Exception + { + return AutoComplete.Collector.isDone(); + } + }); + + lc.run(200000); + + Assert.assertTrue(AutoComplete.Collector.getResult().containsKey("had")); + Assert.assertTrue(AutoComplete.Collector.getResult().containsKey("hadoop")); + Assert.assertEquals(2, AutoComplete.Collector.getResult().get("mapreduce").get(0).getCount()); + + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java ---------------------------------------------------------------------- diff --git a/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java new file mode 100644 index 0000000..dc9cdec --- /dev/null +++ b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.complete; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.Callable; + +import org.junit.After; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Throwables; +import com.datatorrent.api.LocalMode; +import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore; +import com.datatorrent.stram.StramLocalCluster; + +/** + * Testing StreamingWordExtract application + */ +public class StreamingWordExtractTest +{ + private static final String TUPLE_CLASS = "org.apache.apex.malhar.stream.sample.complete.PojoEvent"; + private static final String DB_DRIVER = "org.h2.Driver"; + private static final String DB_URL = "jdbc:h2:~/test"; + private static final String TABLE_NAME = "Test"; + private static final String USER_NAME = "root"; + private static final String PSW = "password"; + + @BeforeClass + public static void setup() + { + try { + Class.forName(DB_DRIVER).newInstance(); + + Connection con = DriverManager.getConnection(DB_URL,USER_NAME,PSW); + Statement stmt = con.createStatement(); + + String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( " + + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, " + + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, " + + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, " + + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", " + + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") " + + ")"; + stmt.executeUpdate(createMetaTable); + + String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME + + "(STRINGVALUE VARCHAR(255))"; + stmt.executeUpdate(createTable); + + } catch (Throwable e) { + throw Throwables.propagate(e); + } + } + + @After + public void cleanTable() + { + try { + Connection con = DriverManager.getConnection(DB_URL,USER_NAME,PSW); + Statement stmt = con.createStatement(); + String dropTable = "drop table " + TABLE_NAME; + stmt.executeUpdate(dropTable); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public void setConfig(Configuration conf) + { + conf.set("dt.operator.jdbcOutput.prop.store.userName", USER_NAME); + conf.set("dt.operator.jdbcOutput.prop.store.password", PSW); + conf.set("dt.operator.jdbcOutput.prop.store.databaseDriver", DB_DRIVER); + conf.set("dt.operator.jdbcOutput.prop.batchSize", "5"); + conf.set("dt.operator.jdbcOutput.port.input.attr.TUPLE_CLASS", TUPLE_CLASS); + conf.set("dt.operator.jdbcOutput.prop.store.databaseUrl", DB_URL); + conf.set("dt.operator.jdbcOutput.prop.tablename", TABLE_NAME); + } + + public int getNumOfEventsInStore() + { + Connection con; + try { + con = DriverManager.getConnection(DB_URL,USER_NAME,PSW); + Statement stmt = con.createStatement(); + + String countQuery = "SELECT count(*) from " + TABLE_NAME; + ResultSet resultSet = stmt.executeQuery(countQuery); + resultSet.next(); + return resultSet.getInt(1); + } catch (SQLException e) { + throw new RuntimeException("fetching count", e); + } + } + + @Test + public void StreamingWordExtractTest() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + setConfig(conf); + StreamingWordExtract app = new StreamingWordExtract(); + lma.prepareDAG(app, conf); + LocalMode.Controller lc = lma.getController(); + + ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() + { + @Override + public Boolean call() throws Exception + { + return getNumOfEventsInStore() == 36; + } + }); + + lc.run(10000); + + Assert.assertEquals(app.getWordCount(), getNumOfEventsInStore()); + Assert.assertEquals(app.getEntriesMapped(), getNumOfEventsInStore()); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java ---------------------------------------------------------------------- diff --git a/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java new file mode 100644 index 0000000..fddf511 --- /dev/null +++ b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.complete; + +import java.util.List; +import java.util.concurrent.Callable; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; +import com.datatorrent.stram.StramLocalCluster; + +/** + * Testing the {@link TopWikipediaSessions} Application. + */ +public class TopWikipediaSessionsTest +{ + @Test + public void TopWikipediaSessionsTest() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.set("dt.application.TopWikipediaSessions.operator.console.silent", "true"); + lma.prepareDAG(new TopWikipediaSessions(), conf); + LocalMode.Controller lc = lma.getController(); + + ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() + { + @Override + public Boolean call() throws Exception + { + return TopWikipediaSessions.SessionGen.getTupleCount() >= 250; + } + }); + + lc.run(30000); + + for (int i = 0; i < TopWikipediaSessions.Collector.getResult().size(); i++) { + Assert.assertTrue(isInOrder(TopWikipediaSessions.Collector.getResult().get(i))); + } + } + + public boolean isInOrder(List<TopWikipediaSessions.TempWrapper> input) + { + if (input.size() == 0 || input.size() == 1) { + return true; + } + for (int i = 0; i < input.size() - 2; i++) { + if (input.get(i).getValue().getValue() < input.get(i + 1).getValue().getValue()) { + return false; + } + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java ---------------------------------------------------------------------- diff --git a/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java new file mode 100644 index 0000000..766fa60 --- /dev/null +++ b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.complete; + +import java.util.Map; +import java.util.concurrent.Callable; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; +import com.datatorrent.lib.util.KeyValPair; +import com.datatorrent.stram.StramLocalCluster; + +/** + * Testing the {@link TrafficRoutes} Application. + */ +public class TrafficRoutesTest +{ + + @Test + public void TrafficRoutesTest() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.set("dt.application.TrafficRoutes.operator.console.silent", "true"); + lma.prepareDAG(new TrafficRoutes(), conf); + LocalMode.Controller lc = lma.getController(); + + ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() + { + @Override + public Boolean call() throws Exception + { + return TrafficRoutes.InfoGen.getTupleCount() >= 100; + } + }); + + lc.run(60000); + + Assert.assertTrue(!TrafficRoutes.Collector.getResult().isEmpty()); + for (Map.Entry<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> entry : TrafficRoutes.Collector.getResult().entrySet()) { + Assert.assertTrue(entry.getValue().getKey() <= 75); + Assert.assertTrue(entry.getValue().getKey() >= 55); + Assert.assertTrue(entry.getKey().getValue().equals("SDRoute1") || entry.getKey().getValue().equals("SDRoute2")); + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoCompleteTest.java ---------------------------------------------------------------------- diff --git a/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoCompleteTest.java b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoCompleteTest.java new file mode 100644 index 0000000..9ba2f25 --- /dev/null +++ b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoCompleteTest.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.complete; + +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + +/** + * Testing the TwitterAutoComplete Application. In order to run this test, you need to create an app + * at https://apps.twitter.com, then generate your consumer and access keys and tokens, and set the following properties + * for the application before running it: + * Your application consumer key, + * Your application consumer secret, + * Your twitter access token, and + * Your twitter access token secret. + * + * This test is mainly for local demonstration purpose. Default time to run the application is 1 minute, please + * set the time you need to run the application before you run. + */ +public class TwitterAutoCompleteTest +{ + private static final Logger logger = LoggerFactory.getLogger(org.apache.apex.malhar.stream.sample.complete.AutoCompleteTest.class); + + @Test + @Ignore + public void TwitterAutoCompleteTest() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + //uncomment the following lines and change YOUR_XXX to the corresponding information needed. + //conf.set("dt.application.TwitterAutoComplete.operator.tweetSampler.consumerKey", "YOUR_CONSUMERKEY"); + //conf.set("dt.application.TwitterAutoComplete.operator.tweetSampler.consumerSecret", "YOUR_CONSUERSECRET"); + //conf.set("dt.application.TwitterAutoComplete.operator.tweetSampler.accessToken", "YOUR_ACCESSTOKEN"); + //conf.set("dt.application.TwitterAutoComplete.operator.tweetSampler.accessTokenSecret", "YOUR_TOKENSECRET"); + lma.prepareDAG(new TwitterAutoComplete(), conf); + LocalMode.Controller lc = lma.getController(); + long start = System.currentTimeMillis(); + lc.run(60000); // Set your desired time to run the application here. + long end = System.currentTimeMillis(); + long time = end - start; + logger.info("Test used " + time + " ms"); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java ---------------------------------------------------------------------- diff --git a/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java new file mode 100644 index 0000000..1e14fff --- /dev/null +++ b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.cookbook; + +import java.util.concurrent.Callable; +import org.junit.Assert; +import org.junit.Test; +import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.LocalMode; +import com.datatorrent.stram.StramLocalCluster; + +/** + * Test for {@link CombinePerKeyExamples}. + */ +public class CombinePerKeyExamplesTest +{ + @Test + public void CombinePerKeyExamplesTest() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.set("dt.application.CombinePerKeyExamples.operator.console.silent", "true"); + CombinePerKeyExamples app = new CombinePerKeyExamples(); + + lma.prepareDAG(app, conf); + + LocalMode.Controller lc = lma.getController(); + ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() + { + @Override + public Boolean call() throws Exception + { + return CombinePerKeyExamples.Collector.isDone(); + } + }); + lc.run(100000); + + Assert.assertTrue(CombinePerKeyExamples.Collector.getResult().get(CombinePerKeyExamples.Collector.getResult().size() - 2).getCorpus().contains("1, 2, 3, 4, 5, 6, 7, 8")); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java ---------------------------------------------------------------------- diff --git a/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java new file mode 100644 index 0000000..7f93f50 --- /dev/null +++ b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.cookbook; + +import java.util.concurrent.Callable; + +import org.junit.Assert; +import org.junit.Test; +import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.LocalMode; + +import com.datatorrent.stram.StramLocalCluster; + + +/** + * Test for {@link DeDupExample}. + */ +public class DeDupExampleTest +{ + @Test + public void DeDupExampleTest() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.set("dt.application.DeDupExample.operator.console.silent", "true"); + DeDupExample app = new DeDupExample(); + lma.prepareDAG(app, conf); + LocalMode.Controller lc = lma.getController(); + ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() + { + @Override + public Boolean call() throws Exception + { + return DeDupExample.Collector.isDone(); + } + }); + lc.run(50000); + + Assert.assertEquals(9, DeDupExample.Collector.getResult().getValue().size()); + + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java ---------------------------------------------------------------------- diff --git a/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java new file mode 100644 index 0000000..ec28b40 --- /dev/null +++ b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.cookbook; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Callable; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Throwables; + +import com.datatorrent.api.LocalMode; +import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore; +import com.datatorrent.stram.StramLocalCluster; + +/** + * Test for MaxPerKeyExamples Application. + */ +public class MaxPerKeyExamplesTest +{ + + private static final String INPUT_TUPLE_CLASS = "org.apache.apex.malhar.stream.sample.cookbook.InputPojo"; + private static final String OUTPUT_TUPLE_CLASS = "org.apache.apex.malhar.stream.sample.cookbook.OutputPojo"; + private static final String DB_DRIVER = "org.h2.Driver"; + private static final String DB_URL = "jdbc:h2:~/test"; + private static final String INPUT_TABLE = "InputTable"; + private static final String OUTPUT_TABLE = "OutputTable"; + private static final String USER_NAME = "root"; + private static final String PSW = "password"; + private static final String QUERY = "SELECT * FROM " + INPUT_TABLE + ";"; + + private static final double[] MEANTEMPS = {85.3, 75.4}; + + @BeforeClass + public static void setup() + { + try { + Class.forName(DB_DRIVER).newInstance(); + + Connection con = DriverManager.getConnection(DB_URL,USER_NAME,PSW); + Statement stmt = con.createStatement(); + + String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( " + + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, " + + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, " + + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, " + + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", " + + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") " + + ")"; + stmt.executeUpdate(createMetaTable); + + String createInputTable = "CREATE TABLE IF NOT EXISTS " + INPUT_TABLE + + "(MONTH INT(2) not NULL, DAY INT(2), YEAR INT(4), MEANTEMP DOUBLE(10) )"; + stmt.executeUpdate(createInputTable); + + String createOutputTable = "CREATE TABLE IF NOT EXISTS " + OUTPUT_TABLE + + "(MONTH INT(2) not NULL, MEANTEMP DOUBLE(10) )"; + stmt.executeUpdate(createOutputTable); + + String cleanTable = "truncate table " + INPUT_TABLE; + stmt.executeUpdate(cleanTable); + + stmt = con.createStatement(); + + String sql = "INSERT INTO " + INPUT_TABLE + " VALUES (6, 21, 2014, 85.3)"; + stmt.executeUpdate(sql); + sql = "INSERT INTO " + INPUT_TABLE + " VALUES (7, 20, 2014, 75.4)"; + stmt.executeUpdate(sql); + sql = "INSERT INTO " + INPUT_TABLE + " VALUES (6, 18, 2014, 45.3)"; + stmt.executeUpdate(sql); + + } catch (Throwable e) { + throw Throwables.propagate(e); + } + } + + @AfterClass + public static void cleanup() + { + try { + Class.forName(DB_DRIVER).newInstance(); + + Connection con = DriverManager.getConnection(DB_URL, USER_NAME, PSW); + Statement stmt = con.createStatement(); + + String dropInputTable = "DROP TABLE " + INPUT_TABLE; + stmt.executeUpdate(dropInputTable); + + String dropOutputTable = "DROP TABLE " + OUTPUT_TABLE; + stmt.executeUpdate(dropOutputTable); + + } catch (Throwable e) { + throw Throwables.propagate(e); + } + + } + + public void setConfig(Configuration conf) + { + conf.set("dt.operator.jdbcInput.prop.store.userName", USER_NAME); + conf.set("dt.operator.jdbcInput.prop.store.password", PSW); + conf.set("dt.operator.jdbcInput.prop.store.databaseDriver", DB_DRIVER); + conf.set("dt.operator.jdbcInput.prop.batchSize", "5"); + conf.set("dt.operator.jdbcInput.port.outputPort.attr.TUPLE_CLASS", INPUT_TUPLE_CLASS); + conf.set("dt.operator.jdbcInput.prop.store.databaseUrl", DB_URL); + conf.set("dt.operator.jdbcInput.prop.tableName", INPUT_TABLE); + conf.set("dt.operator.jdbcInput.prop.query", QUERY); + + conf.set("dt.operator.jdbcOutput.prop.store.userName", USER_NAME); + conf.set("dt.operator.jdbcOutput.prop.store.password", PSW); + conf.set("dt.operator.jdbcOutput.prop.store.databaseDriver", DB_DRIVER); + conf.set("dt.operator.jdbcOutput.prop.batchSize", "5"); + conf.set("dt.operator.jdbcOutput.port.input.attr.TUPLE_CLASS", OUTPUT_TUPLE_CLASS); + conf.set("dt.operator.jdbcOutput.prop.store.databaseUrl", DB_URL); + conf.set("dt.operator.jdbcOutput.prop.tablename", OUTPUT_TABLE); + } + + public int getNumEntries() + { + Connection con; + try { + con = DriverManager.getConnection(DB_URL,USER_NAME,PSW); + Statement stmt = con.createStatement(); + + String countQuery = "SELECT count(DISTINCT (MONTH, MEANTEMP)) from " + OUTPUT_TABLE; + ResultSet resultSet = stmt.executeQuery(countQuery); + resultSet.next(); + return resultSet.getInt(1); + } catch (SQLException e) { + throw new RuntimeException("fetching count", e); + } + } + + public Map<Integer, Double> getMaxMeanTemp() + { + Map<Integer, Double> result = new HashMap<>(); + Connection con; + try { + con = DriverManager.getConnection(DB_URL,USER_NAME,PSW); + Statement stmt = con.createStatement(); + + String countQuery = "SELECT DISTINCT * from " + OUTPUT_TABLE; + ResultSet resultSet = stmt.executeQuery(countQuery); + while (resultSet.next()) { + result.put(resultSet.getInt("MONTH"), resultSet.getDouble("MEANTEMP")); + + } + return result; + } catch (SQLException e) { + throw new RuntimeException("fetching count", e); + } + } + + @Test + public void MaxPerKeyExampleTest() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + setConfig(conf); + + MaxPerKeyExamples app = new MaxPerKeyExamples(); + + lma.prepareDAG(app, conf); + + LocalMode.Controller lc = lma.getController(); + ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() + { + @Override + public Boolean call() throws Exception + { + return getNumEntries() == 2; + } + }); + + lc.run(5000); + + double[] result = new double[2]; + result[0] = getMaxMeanTemp().get(6); + result[1] = getMaxMeanTemp().get(7); + Assert.assertArrayEquals(MEANTEMPS, result, 0.0); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/resources/data/word.txt ---------------------------------------------------------------------- diff --git a/examples/highlevelapi/src/test/resources/data/word.txt b/examples/highlevelapi/src/test/resources/data/word.txt new file mode 100644 index 0000000..7e28409 --- /dev/null +++ b/examples/highlevelapi/src/test/resources/data/word.txt @@ -0,0 +1,2 @@ +word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error +word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/examples/highlevelapi/src/test/resources/log4j.properties b/examples/highlevelapi/src/test/resources/log4j.properties new file mode 100644 index 0000000..592eb19 --- /dev/null +++ b/examples/highlevelapi/src/test/resources/log4j.properties @@ -0,0 +1,45 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +log4j.rootLogger=INFO,CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.CONSOLE.threshold=INFO +#log4j.appender.CONSOLE.threshold=${test.log.console.threshold} +test.log.console.threshold=WARN + +log4j.appender.RFA=org.apache.log4j.RollingFileAppender +log4j.appender.RFA.layout=org.apache.log4j.PatternLayout +log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.RFA.File=/tmp/app.log + +# to enable, add SYSLOG to rootLogger +log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender +log4j.appender.SYSLOG.syslogHost=127.0.0.1 +log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout +log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n +log4j.appender.SYSLOG.Facility=LOCAL1 + +#log4j.logger.org=INFO + +#log4j.logger.org.apache.commons.beanutils=warn +log4j.logger.com.datatorrent=INFO +log4j.logger.org.apache.apex=INFO http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/resources/sampletweets.txt ---------------------------------------------------------------------- diff --git a/examples/highlevelapi/src/test/resources/sampletweets.txt b/examples/highlevelapi/src/test/resources/sampletweets.txt new file mode 100644 index 0000000..c113130 --- /dev/null +++ b/examples/highlevelapi/src/test/resources/sampletweets.txt @@ -0,0 +1,44 @@ +Last week, I published a blog announcing that Apex was accepted as an Apache incubator project. +This week, Iâll give you a little more detail on what Apache Apex is, and why itâs important. + +Apache #Hadoop has been around for over a decade. It has become the de-facto big data platform, +allowing enterprises to transform their business operations by turning big data into something useful, meaningful, +and revenue-generating. #Hadoop promised the enablement of big data without incurring the costs you would normally think such powerful +processing systems would demand. This tremendous promise of transforming business operations continues to fuel high growth in the industry. + +It all got started when Hadoop engineers at Yahoo! asked, âHow can we build an efficient search indexing capability?â +The ensuing iterations and some inspiration resulted in the #MapReduce programming model. Although powerful, MapReduce wasnât perfect. + +Mastering MapReduce required a steep learning curve. Migrating applications to MapReduce required an almost complete rewrite. +Equally worrisome was the fact that MapReduce had batch processing paradigm and âcompute going to dataâ at its core, +thus posing a deterrent to Hadoop realizing its true potential. + +Expectedly enough, #MapReduce was an impediment that did little to bolster productization of big data. +Not to be deterred, there were faster substitutes for MapReduce. Just like Hadoop, these models required deeper expertise, were tough to operate and difficult to master. +As such, #Hadoop disrupted the way big data needs were handled, but remained largely under-productized. + +A decade after Hadoop was started, only a small percentage of big data projects are in production. +Data is growing rapidly and the ability to harness big data has become a decisive competitive advantage. +MapReduce impedes this demand (actually more of a scramble) to transform into a data-driven business. + +In hindsight, it is clear that in the early days, the subsequent success of Hadoop was not anticipated. +If they had anticipated Hadoopâs success, the question would have been, âWhat can we do with massively distributed resources?â +The answer to this question, which came about soon after, was YARN (Hadoop 2.0), the next generation Hadoop. +For the first time, #YARN brought the capability of exploring how distributed resources handling big data could perform âa lot of thingsâ, +thus going beyond the early MapReduce paradigm, and in a way beyond batch or even compute-going-to-data paradigms. +YARN presented the capability to allow big data to not just become big in size, but broader in use cases. With its enabling capability as a Hadoop facilitator, +YARN has pushed Hadoop towards realizing its true potential. The Hadoop predicament is similar to what +cellphones would have been without the more popular features such as messaging and internet connectivity. + + +In their early years, cellphones upset the landline market, but did not foster an immediate market furor till +it transformed into the new-age âsmartphoneâ with impressive features. +YARN is most certainly the enabling factor for big data dreaming bigger and wider, and with it, Hadoop 2.0 is now a true de-facto distributed operating system. + +Whatâs needed is bleeding edge YARN-based platforms capable of radically realizing Hadoopâs potential + +Now is the right time to not only productize big data, but to see how setting it in motion can ensure realization of greater business goals. +A Herculean task, this demands platforms that are easy to deploy, require nothing beyond everyday IT expertise, can effortlessly integrate with an existing IT infrastructure while ensuring ease of migration. +The new-age Hadoop platforms need to be designed with an approach to reduce time-to-market by shortening the application lifecycle, from building to launching, thus quickening the realization of revenue for businesses. +They will also have to reduce time for developers to develop, devOps to operationalize, and finally reduce time to insight for business. +Platforms such as these will need to learn, adapt, and change to meet the burgeoning needs of the big data world. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/resources/wordcount/word.txt ---------------------------------------------------------------------- diff --git a/examples/highlevelapi/src/test/resources/wordcount/word.txt b/examples/highlevelapi/src/test/resources/wordcount/word.txt new file mode 100644 index 0000000..edd0f51 --- /dev/null +++ b/examples/highlevelapi/src/test/resources/wordcount/word.txt @@ -0,0 +1,8 @@ +word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error +word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error +word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error +word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error +word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error +word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error +word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error +bye \ No newline at end of file
