http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a017dfaa/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java ---------------------------------------------------------------------- diff --cc examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java index 4df5fe7,0000000..ab6b28b mode 100644,000000..100644 --- a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java +++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java @@@ -1,127 -1,0 +1,127 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.cookbook; + +import java.util.Arrays; +import java.util.List; + +import org.joda.time.Duration; + ++import org.apache.apex.malhar.lib.function.Function; +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.lib.window.accumulation.RemoveDuplicates; +import org.apache.apex.malhar.stream.api.ApexStream; - import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.StreamFactory; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.common.util.BaseOperator; + +import static org.apache.apex.malhar.stream.api.Option.Options.name; + +/** + * Beam DeDupExample. + * + * @since 3.5.0 + */ +@ApplicationAnnotation(name = "DeDupExample") +public class DeDupExample implements StreamingApplication +{ + + public static class Collector extends BaseOperator + { + private static Tuple.WindowedTuple<List<String>> result; + private static boolean done = false; + + public static Tuple.WindowedTuple<List<String>> getResult() + { + return result; + } + + public static boolean isDone() + { + return done; + } + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + result = new Tuple.WindowedTuple<>(); + done = false; + } + + public transient DefaultInputPort<Tuple.WindowedTuple<List<String>>> input = new DefaultInputPort<Tuple.WindowedTuple<List<String>>>() + { + @Override + public void process(Tuple.WindowedTuple<List<String>> tuple) + { + result = tuple; + if (result.getValue().contains("bye")) { + done = true; + } + } + }; + } + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + Collector collector = new Collector(); + + // Create a stream that reads from files in a local folder and output lines one by one to downstream. + ApexStream<String> stream = StreamFactory.fromFolder("./src/test/resources/wordcount", name("textInput")) + + // Extract all the words from the input line of text. + .flatMap(new Function.FlatMapFunction<String, String>() + { + @Override + public Iterable<String> f(String input) + { + return Arrays.asList(input.split("[\\p{Punct}\\s]+")); + } + }, name("ExtractWords")) + + // Change the words to lower case, also shutdown the app when the word "bye" is detected. + .map(new Function.MapFunction<String, String>() + { + @Override + public String f(String input) + { + return input.toLowerCase(); + } + }, name("ToLowerCase")); + + // Apply window and trigger option. + stream.window(new WindowOption.GlobalWindow(), + new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(1))) + + // Remove the duplicate words and print out the result. + .accumulate(new RemoveDuplicates<String>(), name("RemoveDuplicates")) + .print(name("console")) + .endWith(collector, collector.input) + .populateDag(dag); + } +}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a017dfaa/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java ---------------------------------------------------------------------- diff --cc examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java index 9fd9495,0000000..f28b96a mode 100644,000000..100644 --- a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java +++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java @@@ -1,205 -1,0 +1,205 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.cookbook; + +import java.util.List; + ++import org.apache.apex.malhar.lib.function.Function; +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.lib.window.accumulation.Max; +import org.apache.apex.malhar.stream.api.ApexStream; +import org.apache.apex.malhar.stream.api.CompositeStreamTransform; +import org.apache.apex.malhar.stream.api.WindowedStream; - import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.StreamFactory; +import org.apache.hadoop.conf.Configuration; + +import static java.sql.Types.DOUBLE; +import static java.sql.Types.INTEGER; + +import com.google.common.collect.Lists; +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.db.jdbc.JdbcFieldInfo; +import com.datatorrent.lib.db.jdbc.JdbcPOJOInputOperator; +import com.datatorrent.lib.db.jdbc.JdbcPOJOInsertOutputOperator; +import com.datatorrent.lib.db.jdbc.JdbcStore; +import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore; +import com.datatorrent.lib.util.FieldInfo; +import com.datatorrent.lib.util.KeyValPair; + +import static org.apache.apex.malhar.stream.api.Option.Options.name; + +/** + * MaxPerKeyExamples Application from Beam + * + * @since 3.5.0 + */ +@ApplicationAnnotation(name = "MaxPerKeyExamples") +public class MaxPerKeyExamples implements StreamingApplication +{ + + /** + * A map function to extract the mean temperature from {@link InputPojo}. + */ + public static class ExtractTempFn implements Function.MapFunction<InputPojo, KeyValPair<Integer, Double>> + { + @Override + public KeyValPair<Integer, Double> f(InputPojo row) + { + Integer month = row.getMonth(); + Double meanTemp = row.getMeanTemp(); + return new KeyValPair<Integer, Double>(month, meanTemp); + } + } + + + /** + * A map function to format output to {@link OutputPojo}. + */ + public static class FormatMaxesFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<Integer, Double>>, OutputPojo> + { + @Override + public OutputPojo f(Tuple.WindowedTuple<KeyValPair<Integer, Double>> input) + { + OutputPojo row = new OutputPojo(); + row.setMonth(input.getValue().getKey()); + row.setMeanTemp(input.getValue().getValue()); + return row; + } + } + + /** + * A composite transformation to perform three tasks: + * 1. extract the month and its mean temperature from input pojo. + * 2. find the maximum mean temperature for every month. + * 3. format the result to a output pojo object. + */ + public static class MaxMeanTemp extends CompositeStreamTransform<WindowedStream<InputPojo>, WindowedStream<OutputPojo>> + { + @Override + public WindowedStream<OutputPojo> compose(WindowedStream<InputPojo> rows) + { + // InputPojo... => <month, meanTemp> ... + WindowedStream<KeyValPair<Integer, Double>> temps = rows.map(new ExtractTempFn(), name("ExtractTempFn")); + + // month, meanTemp... => <month, max mean temp>... + WindowedStream<Tuple.WindowedTuple<KeyValPair<Integer, Double>>> tempMaxes = + temps.accumulateByKey(new Max<Double>(), + new Function.ToKeyValue<KeyValPair<Integer, Double>, Integer, Double>() + { + @Override + public Tuple<KeyValPair<Integer, Double>> f(KeyValPair<Integer, Double> input) + { + return new Tuple.WindowedTuple<KeyValPair<Integer, Double>>(Window.GlobalWindow.INSTANCE, input); + } + }, name("MaxPerMonth")); + + // <month, max>... => OutputPojo... + WindowedStream<OutputPojo> results = tempMaxes.map(new FormatMaxesFn(), name("FormatMaxesFn")); + + return results; + } + } + + /** + * Method to set field info for {@link JdbcPOJOInputOperator}. + * @return + */ + private List<FieldInfo> addInputFieldInfos() + { + List<FieldInfo> fieldInfos = Lists.newArrayList(); + fieldInfos.add(new FieldInfo("MONTH", "month", FieldInfo.SupportType.INTEGER)); + fieldInfos.add(new FieldInfo("DAY", "day", FieldInfo.SupportType.INTEGER)); + fieldInfos.add(new FieldInfo("YEAR", "year", FieldInfo.SupportType.INTEGER)); + fieldInfos.add(new FieldInfo("MEANTEMP", "meanTemp", FieldInfo.SupportType.DOUBLE)); + return fieldInfos; + } + + /** + * Method to set field info for {@link JdbcPOJOInsertOutputOperator}. + * @return + */ + private List<JdbcFieldInfo> addOutputFieldInfos() + { + List<JdbcFieldInfo> fieldInfos = Lists.newArrayList(); + fieldInfos.add(new JdbcFieldInfo("MONTH", "month", JdbcFieldInfo.SupportType.INTEGER, INTEGER)); + fieldInfos.add(new JdbcFieldInfo("MEANTEMP", "meanTemp", JdbcFieldInfo.SupportType.DOUBLE, DOUBLE)); + return fieldInfos; + } + + + /** + * Populate the dag using High-Level API. + * @param dag + * @param conf + */ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + JdbcPOJOInputOperator jdbcInput = new JdbcPOJOInputOperator(); + jdbcInput.setFieldInfos(addInputFieldInfos()); + + JdbcStore store = new JdbcStore(); + jdbcInput.setStore(store); + + JdbcPOJOInsertOutputOperator jdbcOutput = new JdbcPOJOInsertOutputOperator(); + jdbcOutput.setFieldInfos(addOutputFieldInfos()); + JdbcTransactionalStore outputStore = new JdbcTransactionalStore(); + jdbcOutput.setStore(outputStore); + + // Create stream that reads from a Jdbc Input. + ApexStream<Object> stream = StreamFactory.fromInput(jdbcInput, jdbcInput.outputPort, name("jdbcInput")) + + // Apply window and trigger option to the stream. + .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1)) + + // Because Jdbc Input sends out a stream of Object, need to cast them to InputPojo. + .map(new Function.MapFunction<Object, InputPojo>() + { + @Override + public InputPojo f(Object input) + { + return (InputPojo)input; + } + }, name("ObjectToInputPojo")) + + // Plug in the composite transformation to the stream to calculate the maximum temperature for each month. + .addCompositeStreams(new MaxMeanTemp()) + + // Cast the resulted OutputPojo to Object for Jdbc Output to consume. + .map(new Function.MapFunction<OutputPojo, Object>() + { + @Override + public Object f(OutputPojo input) + { + return (Object)input; + } + }, name("OutputPojoToObject")) + + // Output the result to Jdbc Output. + .endWith(jdbcOutput, jdbcOutput.input, name("jdbcOutput")); + + stream.populateDag(dag); + + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a017dfaa/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java ---------------------------------------------------------------------- diff --cc examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java index 962faa5,0000000..2fa7619 mode 100644,000000..100644 --- a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java +++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java @@@ -1,577 -1,0 +1,577 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.cookbook; + +import java.util.Date; +import java.util.Objects; + +import org.joda.time.Duration; + ++import org.apache.apex.malhar.lib.function.Function; +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.stream.api.ApexStream; +import org.apache.apex.malhar.stream.api.CompositeStreamTransform; +import org.apache.apex.malhar.stream.api.WindowedStream; - import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.StreamFactory; + +import com.datatorrent.lib.util.KeyValPair; + +/** + * This example illustrates the basic concepts behind triggering. It shows how to use different + * trigger definitions to produce partial (speculative) results before all the data is processed and + * to control when updated results are produced for late data. The example performs a streaming + * analysis of the data coming in from PubSub and writes the results to BigQuery. It divides the + * data into {@link Window windows} to be processed, and demonstrates using various kinds of + * {@link org.apache.beam.sdk.transforms.windowing.Trigger triggers} to control when the results for + * each window are emitted. + * + * <p> This example uses a portion of real traffic data from San Diego freeways. It contains + * readings from sensor stations set up along each freeway. Each sensor reading includes a + * calculation of the 'total flow' across all lanes in that freeway direction. + * + * <p> Concepts: + * <pre> + * 1. The default triggering behavior + * 2. Late data with the default trigger + * 3. How to get speculative estimates + * 4. Combining late data and speculative estimates + * </pre> + * + * <p> Before running this example, it will be useful to familiarize yourself with Dataflow triggers + * and understand the concept of 'late data', + * See: <a href="https://cloud.google.com/dataflow/model/triggers"> + * https://cloud.google.com/dataflow/model/triggers </a> and + * <a href="https://cloud.google.com/dataflow/model/windowing#Advanced"> + * https://cloud.google.com/dataflow/model/windowing#Advanced </a> + * + * <p> The example pipeline reads data from a Pub/Sub topic. By default, running the example will + * also run an auxiliary pipeline to inject data from the default {@code --input} file to the + * {@code --pubsubTopic}. The auxiliary pipeline puts a timestamp on the injected data so that the + * example pipeline can operate on <i>event time</i> (rather than arrival time). The auxiliary + * pipeline also randomly simulates late data, by setting the timestamps of some of the data + * elements to be in the past. You may override the default {@code --input} with the file of your + * choosing or set {@code --input=""} which will disable the automatic Pub/Sub injection, and allow + * you to use a separate tool to publish to the given topic. + * + * <p> The example is configured to use the default Pub/Sub topic and the default BigQuery table + * from the example common package (there are no defaults for a general Dataflow pipeline). + * You can override them by using the {@code --pubsubTopic}, {@code --bigQueryDataset}, and + * {@code --bigQueryTable} options. If the Pub/Sub topic or the BigQuery table do not exist, + * the example will try to create them. + * + * <p> The pipeline outputs its results to a BigQuery table. + * Here are some queries you can use to see interesting results: + * Replace {@code <enter_table_name>} in the query below with the name of the BigQuery table. + * Replace {@code <enter_window_interval>} in the query below with the window interval. + * + * <p> To see the results of the default trigger, + * Note: When you start up your pipeline, you'll initially see results from 'late' data. Wait after + * the window duration, until the first pane of non-late data has been emitted, to see more + * interesting results. + * {@code SELECT * FROM enter_table_name WHERE triggerType = "default" ORDER BY window DESC} + * + * <p> To see the late data i.e. dropped by the default trigger, + * {@code SELECT * FROM <enter_table_name> WHERE triggerType = "withAllowedLateness" and + * (timing = "LATE" or timing = "ON_TIME") and freeway = "5" ORDER BY window DESC, processingTime} + * + * <p>To see the the difference between accumulation mode and discarding mode, + * {@code SELECT * FROM <enter_table_name> WHERE (timing = "LATE" or timing = "ON_TIME") AND + * (triggerType = "withAllowedLateness" or triggerType = "sequential") and freeway = "5" ORDER BY + * window DESC, processingTime} + * + * <p> To see speculative results every minute, + * {@code SELECT * FROM <enter_table_name> WHERE triggerType = "speculative" and freeway = "5" + * ORDER BY window DESC, processingTime} + * + * <p> To see speculative results every five minutes after the end of the window + * {@code SELECT * FROM <enter_table_name> WHERE triggerType = "sequential" and timing != "EARLY" + * and freeway = "5" ORDER BY window DESC, processingTime} + * + * <p> To see the first and the last pane for a freeway in a window for all the trigger types, + * {@code SELECT * FROM <enter_table_name> WHERE (isFirst = true or isLast = true) ORDER BY window} + * + * <p> To reduce the number of results for each query we can add additional where clauses. + * For examples, To see the results of the default trigger, + * {@code SELECT * FROM <enter_table_name> WHERE triggerType = "default" AND freeway = "5" AND + * window = "<enter_window_interval>"} + * + * <p> The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C) + * and then exits. + * + * @since 3.5.0 + */ + +public class TriggerExample +{ + //Numeric value of fixed window duration, in minutes + public static final int WINDOW_DURATION = 30; + // Constants used in triggers. + // Speeding up ONE_MINUTE or FIVE_MINUTES helps you get an early approximation of results. + // ONE_MINUTE is used only with processing time before the end of the window + public static final Duration ONE_MINUTE = Duration.standardMinutes(1); + // FIVE_MINUTES is used only with processing time after the end of the window + public static final Duration FIVE_MINUTES = Duration.standardMinutes(5); + // ONE_DAY is used to specify the amount of lateness allowed for the data elements. + public static final Duration ONE_DAY = Duration.standardDays(1); + + /** + * This transform demonstrates using triggers to control when data is produced for each window + * Consider an example to understand the results generated by each type of trigger. + * The example uses "freeway" as the key. Event time is the timestamp associated with the data + * element and processing time is the time when the data element gets processed in the pipeline. + * For freeway 5, suppose there are 10 elements in the [10:00:00, 10:30:00) window. + * Key (freeway) | Value (totalFlow) | event time | processing time + * 5 | 50 | 10:00:03 | 10:00:47 + * 5 | 30 | 10:01:00 | 10:01:03 + * 5 | 30 | 10:02:00 | 11:07:00 + * 5 | 20 | 10:04:10 | 10:05:15 + * 5 | 60 | 10:05:00 | 11:03:00 + * 5 | 20 | 10:05:01 | 11.07:30 + * 5 | 60 | 10:15:00 | 10:27:15 + * 5 | 40 | 10:26:40 | 10:26:43 + * 5 | 60 | 10:27:20 | 10:27:25 + * 5 | 60 | 10:29:00 | 11:11:00 + * + * <p> Dataflow tracks a watermark which records up to what point in event time the data is + * complete. For the purposes of the example, we'll assume the watermark is approximately 15m + * behind the current processing time. In practice, the actual value would vary over time based + * on the systems knowledge of the current PubSub delay and contents of the backlog (data + * that has not yet been processed). + * + * <p> If the watermark is 15m behind, then the window [10:00:00, 10:30:00) (in event time) would + * close at 10:44:59, when the watermark passes 10:30:00. + */ + static class CalculateTotalFlow + extends CompositeStreamTransform<ApexStream<String>, WindowedStream<SampleBean>> + { + private int windowDuration; + + CalculateTotalFlow(int windowDuration) + { + this.windowDuration = windowDuration; + } + + @Override + public WindowedStream<SampleBean> compose(ApexStream<String> inputStream) + { + // Concept #1: The default triggering behavior + // By default Dataflow uses a trigger which fires when the watermark has passed the end of the + // window. This would be written {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}. + + // The system also defaults to dropping late data -- data which arrives after the watermark + // has passed the event timestamp of the arriving element. This means that the default trigger + // will only fire once. + + // Each pane produced by the default trigger with no allowed lateness will be the first and + // last pane in the window, and will be ON_TIME. + + // The results for the example above with the default trigger and zero allowed lateness + // would be: + // Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing + // 5 | 260 | 6 | true | true | ON_TIME + + // At 11:03:00 (processing time) the system watermark may have advanced to 10:54:00. As a + // result, when the data record with event time 10:05:00 arrives at 11:03:00, it is considered + // late, and dropped. + + WindowedStream<SampleBean> defaultTriggerResults = inputStream + .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)), + new TriggerOption().discardingFiredPanes()) + .addCompositeStreams(new TotalFlow("default")); + + // Concept #2: Late data with the default trigger + // This uses the same trigger as concept #1, but allows data that is up to ONE_DAY late. This + // leads to each window staying open for ONE_DAY after the watermark has passed the end of the + // window. Any late data will result in an additional pane being fired for that same window. + + // The first pane produced will be ON_TIME and the remaining panes will be LATE. + // To definitely get the last pane when the window closes, use + // .withAllowedLateness(ONE_DAY, ClosingBehavior.FIRE_ALWAYS). + + // The results for the example above with the default trigger and ONE_DAY allowed lateness + // would be: + // Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing + // 5 | 260 | 6 | true | false | ON_TIME + // 5 | 60 | 1 | false | false | LATE + // 5 | 30 | 1 | false | false | LATE + // 5 | 20 | 1 | false | false | LATE + // 5 | 60 | 1 | false | false | LATE + WindowedStream<SampleBean> withAllowedLatenessResults = inputStream + .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)), + new TriggerOption().discardingFiredPanes(), + Duration.standardDays(1)) + .addCompositeStreams(new TotalFlow("withAllowedLateness")); + + // Concept #3: How to get speculative estimates + // We can specify a trigger that fires independent of the watermark, for instance after + // ONE_MINUTE of processing time. This allows us to produce speculative estimates before + // all the data is available. Since we don't have any triggers that depend on the watermark + // we don't get an ON_TIME firing. Instead, all panes are either EARLY or LATE. + + // We also use accumulatingFiredPanes to build up the results across each pane firing. + + // The results for the example above for this trigger would be: + // Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing + // 5 | 80 | 2 | true | false | EARLY + // 5 | 100 | 3 | false | false | EARLY + // 5 | 260 | 6 | false | false | EARLY + // 5 | 320 | 7 | false | false | LATE + // 5 | 370 | 9 | false | false | LATE + // 5 | 430 | 10 | false | false | LATE + + ApexStream<SampleBean> speculativeResults = inputStream + .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)), + //Trigger fires every minute + new TriggerOption().withEarlyFiringsAtEvery(Duration.standardMinutes(1)) + // After emitting each pane, it will continue accumulating the elements so that each + // approximation includes all of the previous data in addition to the newly arrived + // data. + .accumulatingFiredPanes(), + Duration.standardDays(1)) + .addCompositeStreams(new TotalFlow("speculative")); + + // Concept #4: Combining late data and speculative estimates + // We can put the previous concepts together to get EARLY estimates, an ON_TIME result, + // and LATE updates based on late data. + + // Each time a triggering condition is satisfied it advances to the next trigger. + // If there are new elements this trigger emits a window under following condition: + // > Early approximations every minute till the end of the window. + // > An on-time firing when the watermark has passed the end of the window + // > Every five minutes of late data. + + // Every pane produced will either be EARLY, ON_TIME or LATE. + + // The results for the example above for this trigger would be: + // Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing + // 5 | 80 | 2 | true | false | EARLY + // 5 | 100 | 3 | false | false | EARLY + // 5 | 260 | 6 | false | false | EARLY + // [First pane fired after the end of the window] + // 5 | 320 | 7 | false | false | ON_TIME + // 5 | 430 | 10 | false | false | LATE + + // For more possibilities of how to build advanced triggers, see {@link Trigger}. + WindowedStream<SampleBean> sequentialResults = inputStream + .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)), + // Speculative every ONE_MINUTE + new TriggerOption().withEarlyFiringsAtEvery(Duration.standardMinutes(1)) + .withLateFiringsAtEvery(Duration.standardMinutes(5)) + // After emitting each pane, it will continue accumulating the elements so that each + // approximation includes all of the previous data in addition to the newly arrived + // data. + .accumulatingFiredPanes(), + Duration.standardDays(1)) + .addCompositeStreams(new TotalFlow("sequential")); + + return sequentialResults; + } + + } + + ////////////////////////////////////////////////////////////////////////////////////////////////// + // The remaining parts of the pipeline are needed to produce the output for each + // concept above. Not directly relevant to understanding the trigger examples. + + /** + * Calculate total flow and number of records for each freeway and format the results to TableRow + * objects, to save to BigQuery. + */ + static class TotalFlow extends + CompositeStreamTransform<WindowedStream<String>, WindowedStream<SampleBean>> + { + private String triggerType; + + public TotalFlow(String triggerType) + { + this.triggerType = triggerType; + } + + @Override + public WindowedStream<SampleBean> compose(WindowedStream<String> inputStream) + { + + WindowedStream<KeyValPair<String, Iterable<Integer>>> flowPerFreeway = inputStream + .groupByKey(new ExtractFlowInfo()); + + return flowPerFreeway + .map(new Function.MapFunction<KeyValPair<String, Iterable<Integer>>, KeyValPair<String, String>>() + { + @Override + public KeyValPair<String, String> f(KeyValPair<String, Iterable<Integer>> input) + { + Iterable<Integer> flows = input.getValue(); + Integer sum = 0; + Long numberOfRecords = 0L; + for (Integer value : flows) { + sum += value; + numberOfRecords++; + } + return new KeyValPair<>(input.getKey(), sum + "," + numberOfRecords); + } + }) + .map(new FormatTotalFlow(triggerType)); + } + } + + /** + * Format the results of the Total flow calculation to a TableRow, to save to BigQuery. + * Adds the triggerType, pane information, processing time and the window timestamp. + */ + static class FormatTotalFlow implements Function.MapFunction<KeyValPair<String, String>, SampleBean> + { + private String triggerType; + + public FormatTotalFlow(String triggerType) + { + this.triggerType = triggerType; + } + + @Override + public SampleBean f(KeyValPair<String, String> input) + { + String[] values = input.getValue().split(","); + //TODO need to have a callback to get the metadata like window id, pane id, timestamps etc. + return new SampleBean(triggerType, input.getKey(), Integer.parseInt(values[0]), Long + .parseLong(values[1]), null, false, false, null, null, new Date()); + } + } + + public static class SampleBean + { + public SampleBean() + { + } + + private String triggerType; + + private String freeway; + + private int totalFlow; + + private long numberOfRecords; + + private String window; + + private boolean isFirst; + + private boolean isLast; + + private Date timing; + + private Date eventTime; + + private Date processingTime; + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SampleBean that = (SampleBean)o; + return totalFlow == that.totalFlow && + numberOfRecords == that.numberOfRecords && + isFirst == that.isFirst && + isLast == that.isLast && + Objects.equals(triggerType, that.triggerType) && + Objects.equals(freeway, that.freeway) && + Objects.equals(window, that.window) && + Objects.equals(timing, that.timing) && + Objects.equals(eventTime, that.eventTime) && + Objects.equals(processingTime, that.processingTime); + } + + @Override + public int hashCode() + { + return Objects + .hash(triggerType, freeway, totalFlow, numberOfRecords, window, isFirst, isLast, timing, eventTime, + processingTime); + } + + public SampleBean(String triggerType, String freeway, int totalFlow, long numberOfRecords, String window, boolean isFirst, boolean isLast, Date timing, Date eventTime, Date processingTime) + { + + this.triggerType = triggerType; + this.freeway = freeway; + this.totalFlow = totalFlow; + this.numberOfRecords = numberOfRecords; + this.window = window; + this.isFirst = isFirst; + this.isLast = isLast; + this.timing = timing; + this.eventTime = eventTime; + this.processingTime = processingTime; + } + + public String getTriggerType() + { + return triggerType; + } + + public void setTriggerType(String triggerType) + { + this.triggerType = triggerType; + } + + public String getFreeway() + { + return freeway; + } + + public void setFreeway(String freeway) + { + this.freeway = freeway; + } + + public int getTotalFlow() + { + return totalFlow; + } + + public void setTotalFlow(int totalFlow) + { + this.totalFlow = totalFlow; + } + + public long getNumberOfRecords() + { + return numberOfRecords; + } + + public void setNumberOfRecords(long numberOfRecords) + { + this.numberOfRecords = numberOfRecords; + } + + public String getWindow() + { + return window; + } + + public void setWindow(String window) + { + this.window = window; + } + + public boolean isFirst() + { + return isFirst; + } + + public void setFirst(boolean first) + { + isFirst = first; + } + + public boolean isLast() + { + return isLast; + } + + public void setLast(boolean last) + { + isLast = last; + } + + public Date getTiming() + { + return timing; + } + + public void setTiming(Date timing) + { + this.timing = timing; + } + + public Date getEventTime() + { + return eventTime; + } + + public void setEventTime(Date eventTime) + { + this.eventTime = eventTime; + } + + public Date getProcessingTime() + { + return processingTime; + } + + public void setProcessingTime(Date processingTime) + { + this.processingTime = processingTime; + } + } + + /** + * Extract the freeway and total flow in a reading. + * Freeway is used as key since we are calculating the total flow for each freeway. + */ + static class ExtractFlowInfo implements Function.ToKeyValue<String, String, Integer> + { + @Override + public Tuple<KeyValPair<String, Integer>> f(String input) + { + String[] laneInfo = input.split(","); + if (laneInfo[0].equals("timestamp")) { + // Header row + return null; + } + if (laneInfo.length < 48) { + //Skip the invalid input. + return null; + } + String freeway = laneInfo[2]; + Integer totalFlow = tryIntegerParse(laneInfo[7]); + // Ignore the records with total flow 0 to easily understand the working of triggers. + // Skip the records with total flow -1 since they are invalid input. + if (totalFlow == null || totalFlow <= 0) { + return null; + } + return new Tuple.PlainTuple<>(new KeyValPair<>(freeway, totalFlow)); + } + } + + private static final String PUBSUB_TIMESTAMP_LABEL_KEY = "timestamp_ms"; + + public static void main(String[] args) throws Exception + { + StreamFactory.fromFolder("some folder") + .addCompositeStreams(new CalculateTotalFlow(60)); + + } + + private static Integer tryIntegerParse(String number) + { + try { + return Integer.parseInt(number); + } catch (NumberFormatException e) { + return null; + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a017dfaa/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java ----------------------------------------------------------------------
