APEXMALHAR-2142 #comment Implement WindowedStream interface Add Accumulation interface support to High-Level API
Add name support for all the windowed transforms Flex types in Composite transformation Add more documents and logs Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/266b0411 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/266b0411 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/266b0411 Branch: refs/heads/master Commit: 266b04116760dbd4d5cad6b4102b06153ac96a5f Parents: 17f6c55 Author: Siyuan Hua <[email protected]> Authored: Tue Jul 12 11:57:09 2016 -0700 Committer: Siyuan Hua <[email protected]> Committed: Thu Aug 25 09:26:03 2016 -0700 ---------------------------------------------------------------------- demos/highlevelapi/pom.xml | 55 ++ demos/highlevelapi/src/assemble/appPackage.xml | 59 ++ .../sample/cookbook/CombinePerKeyExamples.java | 237 ++++++++ .../stream/sample/cookbook/TriggerExample.java | 578 +++++++++++++++++++ demos/pom.xml | 12 + stream/pom.xml | 1 - .../apex/malhar/stream/api/ApexStream.java | 144 ++--- .../stream/api/CompositeStreamTransform.java | 30 + .../apache/apex/malhar/stream/api/Option.java | 122 ++++ .../apex/malhar/stream/api/WindowedStream.java | 150 +++++ .../malhar/stream/api/function/Function.java | 36 +- .../malhar/stream/api/impl/ApexStreamImpl.java | 194 +++---- .../stream/api/impl/ApexWindowedStreamImpl.java | 275 +++++++++ .../apex/malhar/stream/api/impl/DagMeta.java | 53 +- .../malhar/stream/api/impl/IDGenerator.java | 3 + .../malhar/stream/api/impl/StreamFactory.java | 76 ++- .../stream/api/impl/TupleWrapperOperator.java | 192 ++++++ .../stream/api/impl/accumulation/Count.java | 61 ++ .../stream/api/impl/accumulation/FoldFn.java | 65 +++ .../stream/api/impl/accumulation/ReduceFn.java | 65 +++ .../stream/api/impl/accumulation/TopN.java | 107 ++++ .../api/operator/AnnonymousClassModifier.java | 3 + .../api/operator/ByteArrayClassLoader.java | 3 + .../stream/api/operator/FunctionOperator.java | 112 ++-- .../apex/malhar/stream/api/util/KeyedTuple.java | 34 -- .../apex/malhar/stream/api/util/TupleUtil.java | 21 +- .../FunctionOperator/FunctionOperatorTest.java | 4 +- .../stream/api/impl/ApexStreamImplTest.java | 6 +- .../stream/sample/ApplicationWithStreamAPI.java | 21 +- .../LocalTestWithoutStreamApplication.java | 54 +- .../apex/malhar/stream/sample/MyStream.java | 3 +- .../apex/malhar/stream/sample/MyStreamTest.java | 87 ++- .../malhar/stream/sample/TupleCollector.java | 2 + .../apex/malhar/stream/sample/WCInput.java | 90 +++ .../stream/sample/WordCountWithStreamAPI.java | 72 +++ stream/src/test/resources/sampletweets.txt | 207 +++++++ 36 files changed, 2839 insertions(+), 395 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/demos/highlevelapi/pom.xml ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/pom.xml b/demos/highlevelapi/pom.xml new file mode 100644 index 0000000..c669681 --- /dev/null +++ b/demos/highlevelapi/pom.xml @@ -0,0 +1,55 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <artifactId>high-level-api-demo</artifactId> + <packaging>jar</packaging> + + <name>Apache Apex Malhar High-Level API Demo</name> + <description>Apex demo applications that use High-level API to construct a dag</description> + + <parent> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-demos</artifactId> + <version>3.5.0-SNAPSHOT</version> + </parent> + + <properties> + <skipTests>true</skipTests> + </properties> + + <dependencies> + <dependency> + <groupId>cglib</groupId> + <artifactId>cglib</artifactId> + <version>3.2.1</version> + </dependency> + <dependency> + <groupId>joda-time</groupId> + <artifactId>joda-time</artifactId> + <version>2.9.1</version> + </dependency> + </dependencies> + + +</project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/demos/highlevelapi/src/assemble/appPackage.xml ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/assemble/appPackage.xml b/demos/highlevelapi/src/assemble/appPackage.xml new file mode 100644 index 0000000..4138cf2 --- /dev/null +++ b/demos/highlevelapi/src/assemble/appPackage.xml @@ -0,0 +1,59 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> + <id>appPackage</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <fileSets> + <fileSet> + <directory>${basedir}/target/</directory> + <outputDirectory>/app</outputDirectory> + <includes> + <include>${project.artifactId}-${project.version}.jar</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/target/deps</directory> + <outputDirectory>/lib</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/site/conf</directory> + <outputDirectory>/conf</outputDirectory> + <includes> + <include>*.xml</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/META-INF</directory> + <outputDirectory>/META-INF</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/app</directory> + <outputDirectory>/app</outputDirectory> + </fileSet> + </fileSets> + +</assembly> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java new file mode 100644 index 0000000..5d4c628 --- /dev/null +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.cookbook; + +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.stream.api.ApexStream; +import org.apache.apex.malhar.stream.api.CompositeStreamTransform; +import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.StreamFactory; +import org.apache.apex.malhar.stream.api.impl.accumulation.ReduceFn; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.lib.util.KeyValPair; + +/** + * An example that reads the public 'Shakespeare' data, and for each word in + * the dataset that is over a given length, generates a string containing the + * list of play names in which that word appears + * + * <p>Concepts: the combine transform, which lets you combine the values in a + * key-grouped Collection + * + */ +public class CombinePerKeyExamples +{ + // Use the shakespeare public BigQuery sample + private static final String SHAKESPEARE_TABLE = "publicdata:samples.shakespeare"; + // We'll track words >= this word length across all plays in the table. + private static final int MIN_WORD_LENGTH = 9; + + /** + * Examines each row in the input table. If the word is greater than or equal to MIN_WORD_LENGTH, + * outputs word, play_name. + */ + static class ExtractLargeWordsFn implements Function.MapFunction<SampleBean, KeyValPair<String, String>> + { + + @Override + public KeyValPair<String, String> f(SampleBean input) + { + String playName = input.getCorpus(); + String word = input.getWord(); + if (word.length() >= MIN_WORD_LENGTH) { + return new KeyValPair<>(word, playName); + } else { + return null; + } + } + } + + + /** + * Prepares the output data which is in same bean + */ + static class FormatShakespeareOutputFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, String>>, SampleBean> + { + @Override + public SampleBean f(Tuple.WindowedTuple<KeyValPair<String, String>> input) + { + return new SampleBean(input.getValue().getKey(), input.getValue().getValue(), null); + } + } + + /** + * Reads the public 'Shakespeare' data, and for each word in the dataset + * over a given length, generates a string containing the list of play names + * in which that word appears. + */ + static class PlaysForWord + extends CompositeStreamTransform<SampleBean, SampleBean> + { + + @Override + public ApexStream<SampleBean> compose(ApexStream<SampleBean> inputStream) + { + // fix this later + return inputStream.map(new ExtractLargeWordsFn()) + .window(new WindowOption.GlobalWindow()) + .reduceByKey(new ReduceFn<String>() + { + @Override + public String defaultAccumulatedValue() + { + return ""; + } + + @Override + public String accumulate(String accumulatedValue, String input) + { + return accumulatedValue + "," + input; + } + + @Override + public String merge(String accumulatedValue1, String accumulatedValue2) + { + return accumulatedValue1 + "," + accumulatedValue2; + } + + @Override + public String getOutput(String accumulatedValue) + { + return accumulatedValue; + } + + @Override + public String getRetraction(String value) + { + return value; + } + }, new Function.MapFunction<KeyValPair<String, String>, Tuple<KeyValPair<String, String>>>() + + { + @Override + public Tuple<KeyValPair<String, String>> f(KeyValPair<String, String> input) + { + return null; + } + }) + .map(new FormatShakespeareOutputFn()); + } + } + + + public static class SampleBean + { + + public SampleBean() + { + + } + + public SampleBean(String word, String all_plays, String corpus) + { + this.word = word; + this.all_plays = all_plays; + this.corpus = corpus; + } + + private String word; + + private String all_plays; + + private String corpus; + + public void setWord(String word) + { + this.word = word; + } + + public String getWord() + { + return word; + } + + public void setCorpus(String corpus) + { + this.corpus = corpus; + } + + public String getCorpus() + { + return corpus; + } + + public void setAll_plays(String all_plays) + { + this.all_plays = all_plays; + } + + public String getAll_plays() + { + return all_plays; + } + } + + public static class SampleInput implements InputOperator + { + + public final transient DefaultOutputPort<SampleBean> beanOutput = new DefaultOutputPort(); + + @Override + public void emitTuples() + { + + } + + @Override + public void beginWindow(long l) + { + + } + + @Override + public void endWindow() + { + + } + + @Override + public void setup(Context.OperatorContext context) + { + + } + + @Override + public void teardown() + { + + } + } + + + public static void main(String[] args) throws Exception + { + SampleInput input = new SampleInput(); + StreamFactory.fromInput(input, input.beanOutput).addCompositeStreams(new PlaysForWord()); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java new file mode 100644 index 0000000..903f624 --- /dev/null +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java @@ -0,0 +1,578 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample.cookbook; + +import java.util.Date; +import java.util.Objects; + +import org.joda.time.Duration; + +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.stream.api.ApexStream; +import org.apache.apex.malhar.stream.api.CompositeStreamTransform; +import org.apache.apex.malhar.stream.api.WindowedStream; +import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.StreamFactory; + +import com.datatorrent.lib.util.KeyValPair; + +/** + * This example illustrates the basic concepts behind triggering. It shows how to use different + * trigger definitions to produce partial (speculative) results before all the data is processed and + * to control when updated results are produced for late data. The example performs a streaming + * analysis of the data coming in from PubSub and writes the results to BigQuery. It divides the + * data into {@link Window windows} to be processed, and demonstrates using various kinds of + * {@link org.apache.beam.sdk.transforms.windowing.Trigger triggers} to control when the results for + * each window are emitted. + * + * <p> This example uses a portion of real traffic data from San Diego freeways. It contains + * readings from sensor stations set up along each freeway. Each sensor reading includes a + * calculation of the 'total flow' across all lanes in that freeway direction. + * + * <p> Concepts: + * <pre> + * 1. The default triggering behavior + * 2. Late data with the default trigger + * 3. How to get speculative estimates + * 4. Combining late data and speculative estimates + * </pre> + * + * <p> Before running this example, it will be useful to familiarize yourself with Dataflow triggers + * and understand the concept of 'late data', + * See: <a href="https://cloud.google.com/dataflow/model/triggers"> + * https://cloud.google.com/dataflow/model/triggers </a> and + * <a href="https://cloud.google.com/dataflow/model/windowing#Advanced"> + * https://cloud.google.com/dataflow/model/windowing#Advanced </a> + * + * <p> The example pipeline reads data from a Pub/Sub topic. By default, running the example will + * also run an auxiliary pipeline to inject data from the default {@code --input} file to the + * {@code --pubsubTopic}. The auxiliary pipeline puts a timestamp on the injected data so that the + * example pipeline can operate on <i>event time</i> (rather than arrival time). The auxiliary + * pipeline also randomly simulates late data, by setting the timestamps of some of the data + * elements to be in the past. You may override the default {@code --input} with the file of your + * choosing or set {@code --input=""} which will disable the automatic Pub/Sub injection, and allow + * you to use a separate tool to publish to the given topic. + * + * <p> The example is configured to use the default Pub/Sub topic and the default BigQuery table + * from the example common package (there are no defaults for a general Dataflow pipeline). + * You can override them by using the {@code --pubsubTopic}, {@code --bigQueryDataset}, and + * {@code --bigQueryTable} options. If the Pub/Sub topic or the BigQuery table do not exist, + * the example will try to create them. + * + * <p> The pipeline outputs its results to a BigQuery table. + * Here are some queries you can use to see interesting results: + * Replace {@code <enter_table_name>} in the query below with the name of the BigQuery table. + * Replace {@code <enter_window_interval>} in the query below with the window interval. + * + * <p> To see the results of the default trigger, + * Note: When you start up your pipeline, you'll initially see results from 'late' data. Wait after + * the window duration, until the first pane of non-late data has been emitted, to see more + * interesting results. + * {@code SELECT * FROM enter_table_name WHERE trigger_type = "default" ORDER BY window DESC} + * + * <p> To see the late data i.e. dropped by the default trigger, + * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "withAllowedLateness" and + * (timing = "LATE" or timing = "ON_TIME") and freeway = "5" ORDER BY window DESC, processing_time} + * + * <p>To see the the difference between accumulation mode and discarding mode, + * {@code SELECT * FROM <enter_table_name> WHERE (timing = "LATE" or timing = "ON_TIME") AND + * (trigger_type = "withAllowedLateness" or trigger_type = "sequential") and freeway = "5" ORDER BY + * window DESC, processing_time} + * + * <p> To see speculative results every minute, + * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "speculative" and freeway = "5" + * ORDER BY window DESC, processing_time} + * + * <p> To see speculative results every five minutes after the end of the window + * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "sequential" and timing != "EARLY" + * and freeway = "5" ORDER BY window DESC, processing_time} + * + * <p> To see the first and the last pane for a freeway in a window for all the trigger types, + * {@code SELECT * FROM <enter_table_name> WHERE (isFirst = true or isLast = true) ORDER BY window} + * + * <p> To reduce the number of results for each query we can add additional where clauses. + * For examples, To see the results of the default trigger, + * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "default" AND freeway = "5" AND + * window = "<enter_window_interval>"} + * + * <p> The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C) + * and then exits. + */ + +public class TriggerExample +{ + //Numeric value of fixed window duration, in minutes + public static final int WINDOW_DURATION = 30; + // Constants used in triggers. + // Speeding up ONE_MINUTE or FIVE_MINUTES helps you get an early approximation of results. + // ONE_MINUTE is used only with processing time before the end of the window + public static final Duration ONE_MINUTE = Duration.standardMinutes(1); + // FIVE_MINUTES is used only with processing time after the end of the window + public static final Duration FIVE_MINUTES = Duration.standardMinutes(5); + // ONE_DAY is used to specify the amount of lateness allowed for the data elements. + public static final Duration ONE_DAY = Duration.standardDays(1); + + /** + * This transform demonstrates using triggers to control when data is produced for each window + * Consider an example to understand the results generated by each type of trigger. + * The example uses "freeway" as the key. Event time is the timestamp associated with the data + * element and processing time is the time when the data element gets processed in the pipeline. + * For freeway 5, suppose there are 10 elements in the [10:00:00, 10:30:00) window. + * Key (freeway) | Value (total_flow) | event time | processing time + * 5 | 50 | 10:00:03 | 10:00:47 + * 5 | 30 | 10:01:00 | 10:01:03 + * 5 | 30 | 10:02:00 | 11:07:00 + * 5 | 20 | 10:04:10 | 10:05:15 + * 5 | 60 | 10:05:00 | 11:03:00 + * 5 | 20 | 10:05:01 | 11.07:30 + * 5 | 60 | 10:15:00 | 10:27:15 + * 5 | 40 | 10:26:40 | 10:26:43 + * 5 | 60 | 10:27:20 | 10:27:25 + * 5 | 60 | 10:29:00 | 11:11:00 + * + * <p> Dataflow tracks a watermark which records up to what point in event time the data is + * complete. For the purposes of the example, we'll assume the watermark is approximately 15m + * behind the current processing time. In practice, the actual value would vary over time based + * on the systems knowledge of the current PubSub delay and contents of the backlog (data + * that has not yet been processed). + * + * <p> If the watermark is 15m behind, then the window [10:00:00, 10:30:00) (in event time) would + * close at 10:44:59, when the watermark passes 10:30:00. + */ + static class CalculateTotalFlow + extends CompositeStreamTransform<String, SampleBean> + { + private int windowDuration; + + CalculateTotalFlow(int windowDuration) + { + this.windowDuration = windowDuration; + } + + @Override + public ApexStream<SampleBean> compose(ApexStream<String> inputStream) + { + // Concept #1: The default triggering behavior + // By default Dataflow uses a trigger which fires when the watermark has passed the end of the + // window. This would be written {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}. + + // The system also defaults to dropping late data -- data which arrives after the watermark + // has passed the event timestamp of the arriving element. This means that the default trigger + // will only fire once. + + // Each pane produced by the default trigger with no allowed lateness will be the first and + // last pane in the window, and will be ON_TIME. + + // The results for the example above with the default trigger and zero allowed lateness + // would be: + // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing + // 5 | 260 | 6 | true | true | ON_TIME + + // At 11:03:00 (processing time) the system watermark may have advanced to 10:54:00. As a + // result, when the data record with event time 10:05:00 arrives at 11:03:00, it is considered + // late, and dropped. + + ApexStream<SampleBean> defaultTriggerResults = inputStream + .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)), + new TriggerOption().discardingFiredPanes()) + .addCompositeStreams(new TotalFlow("default")); + + // Concept #2: Late data with the default trigger + // This uses the same trigger as concept #1, but allows data that is up to ONE_DAY late. This + // leads to each window staying open for ONE_DAY after the watermark has passed the end of the + // window. Any late data will result in an additional pane being fired for that same window. + + // The first pane produced will be ON_TIME and the remaining panes will be LATE. + // To definitely get the last pane when the window closes, use + // .withAllowedLateness(ONE_DAY, ClosingBehavior.FIRE_ALWAYS). + + // The results for the example above with the default trigger and ONE_DAY allowed lateness + // would be: + // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing + // 5 | 260 | 6 | true | false | ON_TIME + // 5 | 60 | 1 | false | false | LATE + // 5 | 30 | 1 | false | false | LATE + // 5 | 20 | 1 | false | false | LATE + // 5 | 60 | 1 | false | false | LATE + ApexStream<SampleBean> withAllowedLatenessResults = inputStream + .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)), + new TriggerOption().discardingFiredPanes(), + Duration.standardDays(1)) + .addCompositeStreams(new TotalFlow("withAllowedLateness")); + + // Concept #3: How to get speculative estimates + // We can specify a trigger that fires independent of the watermark, for instance after + // ONE_MINUTE of processing time. This allows us to produce speculative estimates before + // all the data is available. Since we don't have any triggers that depend on the watermark + // we don't get an ON_TIME firing. Instead, all panes are either EARLY or LATE. + + // We also use accumulatingFiredPanes to build up the results across each pane firing. + + // The results for the example above for this trigger would be: + // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing + // 5 | 80 | 2 | true | false | EARLY + // 5 | 100 | 3 | false | false | EARLY + // 5 | 260 | 6 | false | false | EARLY + // 5 | 320 | 7 | false | false | LATE + // 5 | 370 | 9 | false | false | LATE + // 5 | 430 | 10 | false | false | LATE + + ApexStream<SampleBean> speculativeResults = inputStream + .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)), + //Trigger fires every minute + new TriggerOption().withEarlyFiringsAtEvery(Duration.standardMinutes(1)) + // After emitting each pane, it will continue accumulating the elements so that each + // approximation includes all of the previous data in addition to the newly arrived + // data. + .accumulatingFiredPanes(), + Duration.standardDays(1)) + .addCompositeStreams(new TotalFlow("speculative")); + + // Concept #4: Combining late data and speculative estimates + // We can put the previous concepts together to get EARLY estimates, an ON_TIME result, + // and LATE updates based on late data. + + // Each time a triggering condition is satisfied it advances to the next trigger. + // If there are new elements this trigger emits a window under following condition: + // > Early approximations every minute till the end of the window. + // > An on-time firing when the watermark has passed the end of the window + // > Every five minutes of late data. + + // Every pane produced will either be EARLY, ON_TIME or LATE. + + // The results for the example above for this trigger would be: + // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing + // 5 | 80 | 2 | true | false | EARLY + // 5 | 100 | 3 | false | false | EARLY + // 5 | 260 | 6 | false | false | EARLY + // [First pane fired after the end of the window] + // 5 | 320 | 7 | false | false | ON_TIME + // 5 | 430 | 10 | false | false | LATE + + // For more possibilities of how to build advanced triggers, see {@link Trigger}. + ApexStream<SampleBean> sequentialResults = inputStream + .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)), + // Speculative every ONE_MINUTE + new TriggerOption().withEarlyFiringsAtEvery(Duration.standardMinutes(1)) + .withLateFiringsAtEvery(Duration.standardMinutes(5)) + // After emitting each pane, it will continue accumulating the elements so that each + // approximation includes all of the previous data in addition to the newly arrived + // data. + .accumulatingFiredPanes(), + Duration.standardDays(1)) + .addCompositeStreams(new TotalFlow("sequential")); + + return sequentialResults; + } + + } + + ////////////////////////////////////////////////////////////////////////////////////////////////// + // The remaining parts of the pipeline are needed to produce the output for each + // concept above. Not directly relevant to understanding the trigger examples. + + /** + * Calculate total flow and number of records for each freeway and format the results to TableRow + * objects, to save to BigQuery. + */ + static class TotalFlow extends + CompositeStreamTransform<String, SampleBean> + { + private String triggerType; + + public TotalFlow(String triggerType) + { + this.triggerType = triggerType; + } + + @Override + public ApexStream<SampleBean> compose(ApexStream<String> inputStream) + { + if (!(inputStream instanceof WindowedStream)) { + throw new RuntimeException("Not supported here"); + } + WindowedStream<String> windowedStream = (WindowedStream<String>)inputStream; + ApexStream<KeyValPair<String, Iterable<Integer>>> flowPerFreeway = windowedStream + .groupByKey(new ExtractFlowInfo()); + + return flowPerFreeway + .map(new Function.MapFunction<KeyValPair<String, Iterable<Integer>>, KeyValPair<String, String>>() + { + @Override + public KeyValPair<String, String> f(KeyValPair<String, Iterable<Integer>> input) + { + Iterable<Integer> flows = input.getValue(); + Integer sum = 0; + Long numberOfRecords = 0L; + for (Integer value : flows) { + sum += value; + numberOfRecords++; + } + return new KeyValPair<>(input.getKey(), sum + "," + numberOfRecords); + } + }) + .map(new FormatTotalFlow(triggerType)); + } + } + + /** + * Format the results of the Total flow calculation to a TableRow, to save to BigQuery. + * Adds the triggerType, pane information, processing time and the window timestamp. + */ + static class FormatTotalFlow implements Function.MapFunction<KeyValPair<String, String>, SampleBean> + { + private String triggerType; + + public FormatTotalFlow(String triggerType) + { + this.triggerType = triggerType; + } + + @Override + public SampleBean f(KeyValPair<String, String> input) + { + String[] values = input.getValue().split(","); + //TODO need to have a callback to get the metadata like window id, pane id, timestamps etc. + return new SampleBean(triggerType, input.getKey(), Integer.parseInt(values[0]), Long + .parseLong(values[1]), null, false, false, null, null, new Date()); + } + } + + public static class SampleBean + { + public SampleBean() + { + } + + private String trigger_type; + + private String freeway; + + private int total_flow; + + private long number_of_records; + + private String window; + + private boolean isFirst; + + private boolean isLast; + + private Date timing; + + private Date event_time; + + private Date processing_time; + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SampleBean that = (SampleBean)o; + return total_flow == that.total_flow && + number_of_records == that.number_of_records && + isFirst == that.isFirst && + isLast == that.isLast && + Objects.equals(trigger_type, that.trigger_type) && + Objects.equals(freeway, that.freeway) && + Objects.equals(window, that.window) && + Objects.equals(timing, that.timing) && + Objects.equals(event_time, that.event_time) && + Objects.equals(processing_time, that.processing_time); + } + + @Override + public int hashCode() + { + return Objects + .hash(trigger_type, freeway, total_flow, number_of_records, window, isFirst, isLast, timing, event_time, + processing_time); + } + + public SampleBean(String trigger_type, String freeway, int total_flow, long number_of_records, String window, + boolean isFirst, boolean isLast, Date timing, Date event_time, Date processing_time) + { + + this.trigger_type = trigger_type; + this.freeway = freeway; + this.total_flow = total_flow; + this.number_of_records = number_of_records; + this.window = window; + this.isFirst = isFirst; + this.isLast = isLast; + this.timing = timing; + this.event_time = event_time; + this.processing_time = processing_time; + } + + public String getTrigger_type() + { + return trigger_type; + } + + public void setTrigger_type(String trigger_type) + { + this.trigger_type = trigger_type; + } + + public String getFreeway() + { + return freeway; + } + + public void setFreeway(String freeway) + { + this.freeway = freeway; + } + + public int getTotal_flow() + { + return total_flow; + } + + public void setTotal_flow(int total_flow) + { + this.total_flow = total_flow; + } + + public long getNumber_of_records() + { + return number_of_records; + } + + public void setNumber_of_records(long number_of_records) + { + this.number_of_records = number_of_records; + } + + public String getWindow() + { + return window; + } + + public void setWindow(String window) + { + this.window = window; + } + + public boolean isFirst() + { + return isFirst; + } + + public void setFirst(boolean first) + { + isFirst = first; + } + + public boolean isLast() + { + return isLast; + } + + public void setLast(boolean last) + { + isLast = last; + } + + public Date getTiming() + { + return timing; + } + + public void setTiming(Date timing) + { + this.timing = timing; + } + + public Date getEvent_time() + { + return event_time; + } + + public void setEvent_time(Date event_time) + { + this.event_time = event_time; + } + + public Date getProcessing_time() + { + return processing_time; + } + + public void setProcessing_time(Date processing_time) + { + this.processing_time = processing_time; + } + } + + /** + * Extract the freeway and total flow in a reading. + * Freeway is used as key since we are calculating the total flow for each freeway. + */ + static class ExtractFlowInfo implements Function.MapFunction<String, KeyValPair<String, Integer>> + { + @Override + public KeyValPair<String, Integer> f(String input) + { + String[] laneInfo = input.split(","); + if (laneInfo[0].equals("timestamp")) { + // Header row + return null; + } + if (laneInfo.length < 48) { + //Skip the invalid input. + return null; + } + String freeway = laneInfo[2]; + Integer totalFlow = tryIntegerParse(laneInfo[7]); + // Ignore the records with total flow 0 to easily understand the working of triggers. + // Skip the records with total flow -1 since they are invalid input. + if (totalFlow == null || totalFlow <= 0) { + return null; + } + return new KeyValPair<>(freeway, totalFlow); + } + } + + private static final String PUBSUB_TIMESTAMP_LABEL_KEY = "timestamp_ms"; + + public static void main(String[] args) throws Exception + { + StreamFactory.fromFolder("some folder") + .addCompositeStreams(new CalculateTotalFlow(60)); + + } + + private static Integer tryIntegerParse(String number) + { + try { + return Integer.parseInt(number); + } catch (NumberFormatException e) { + return null; + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/demos/pom.xml ---------------------------------------------------------------------- diff --git a/demos/pom.xml b/demos/pom.xml index 12f0f14..e9f2daf 100644 --- a/demos/pom.xml +++ b/demos/pom.xml @@ -192,6 +192,7 @@ <module>r</module> <module>echoserver</module> <module>iteration</module> + <module>highlevelapi</module> </modules> <dependencies> @@ -231,6 +232,17 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-stream</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/pom.xml ---------------------------------------------------------------------- diff --git a/stream/pom.xml b/stream/pom.xml index fd663e0..445be92 100755 --- a/stream/pom.xml +++ b/stream/pom.xml @@ -95,6 +95,5 @@ <version>3.2.1</version> </dependency> - </dependencies> </project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java index 2f65ba9..6d44534 100644 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java @@ -18,11 +18,14 @@ */ package org.apache.apex.malhar.stream.api; - -import java.util.Map; import java.util.concurrent.Callable; +import org.joda.time.Duration; + +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.WindowOption; import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.hadoop.classification.InterfaceStability; import com.datatorrent.api.Attribute; import com.datatorrent.api.Context.DAGContext; @@ -37,6 +40,7 @@ import com.datatorrent.api.Operator; * * @since 3.4.0 */ [email protected] public interface ApexStream<T> { /** @@ -46,17 +50,7 @@ public interface ApexStream<T> * @param <O> Type of the output * @return new stream of type O */ - <O, STREAM extends ApexStream<O>> STREAM map(Function.MapFunction<T, O> mapFunction); - - /** - * Simple map transformation<br> - * Add an operator to the DAG which convert tuple T to tuple O - * @param name operator name - * @param mapFunction map function - * @param <O> Type of the output - * @return new stream of type O - */ - <O, STREAM extends ApexStream<O>> STREAM map(String name, Function.MapFunction<T, O> mapFunction); + <O, STREAM extends ApexStream<O>> STREAM map(Function.MapFunction<T, O> mapFunction, Option... opts); /** * Flat map transformation @@ -65,17 +59,7 @@ public interface ApexStream<T> * @param <O> Type of the output * @return new stream of type O */ - <O, STREAM extends ApexStream<O>> STREAM flatMap(Function.FlatMapFunction<T, O> flatten); - - /** - * Flat map transformation<br> - * Add an operator to the DAG which convert tuple T to a collection of tuple O - * @param name operator name - * @param flatten - * @param <O> Type of the output - * @return new stream of type O - */ - <O, STREAM extends ApexStream<O>> STREAM flatMap(String name, Function.FlatMapFunction<T, O> flatten); + <O, STREAM extends ApexStream<O>> STREAM flatMap(Function.FlatMapFunction<T, O> flatten, Option... opts); /** * Filter transformation<br> @@ -83,76 +67,7 @@ public interface ApexStream<T> * @param filter filter function * @return new stream of same type */ - <STREAM extends ApexStream<T>> STREAM filter(Function.FilterFunction<T> filter); - - /** - * Filter transformation<br> - * Add an operator to the DAG which filter out tuple T that cannot satisfy the FilterFunction - * @param name operator name - * @param filter filter function - * @return new stream of same type - */ - <STREAM extends ApexStream<T>> STREAM filter(String name, Function.FilterFunction<T> filter); - - /** - * Reduce transformation<br> - * Add an operator to the DAG which merge tuple t1, t2 to new tuple - * @param reduce reduce function - * @return new stream of same type - */ - <STREAM extends ApexStream<T>> STREAM reduce(Function.ReduceFunction<T> reduce); - - /** - * Reduce transformation<br> - * Add an operator to the DAG which merge tuple t1, t2 to new tuple - * @param name operator name - * @param reduce reduce function - * @return new stream of same type - */ - <STREAM extends ApexStream<T>> STREAM reduce(String name, Function.ReduceFunction<T> reduce); - - /** - * Fold transformation<br> - * Add an operator to the DAG which merge tuple T to accumulated result tuple O - * @param initialValue initial result value - * @param fold fold function - * @param <O> Result type - * @return new stream of type O - */ - <O, STREAM extends ApexStream<O>> STREAM fold(O initialValue, Function.FoldFunction<T, O> fold); - - /** - * Fold transformation<br> - * Add an operator to the DAG which merge tuple T to accumulated result tuple O - * @param name name of the operator - * @param initialValue initial result value - * @param fold fold function - * @param <O> Result type - * @return new stream of type O - */ - <O, STREAM extends ApexStream<O>> STREAM fold(String name, O initialValue, Function.FoldFunction<T, O> fold); - - /** - * Count of all tuples - * @return new stream of Integer - */ - <STREAM extends ApexStream<Integer>> STREAM count(); - - /** - * Count tuples by the key<br> - * If the input is KeyedTuple it will get the key from getKey method from the tuple<br> - * If not, use the tuple itself as a key - * @return new stream of Map - */ - <STREAM extends ApexStream<Map<Object, Integer>>> STREAM countByKey(); - - /** - * - * Count tuples by the indexed key - * @param key the index of the field in the tuple that are used as key - * @return new stream of Map - */ - <STREAM extends ApexStream<Map<Object, Integer>>> STREAM countByKey(int key); + <STREAM extends ApexStream<T>> STREAM filter(Function.FilterFunction<T> filter, Option... opts); /** * Extend the dag by adding one operator<br> @@ -162,18 +77,23 @@ public interface ApexStream<T> * @param <O> type of the output * @return new stream of type O */ - <O, STREAM extends ApexStream<O>> STREAM addOperator(Operator op, Operator.InputPort<T> inputPort, Operator.OutputPort<O> outputPort); + <O, STREAM extends ApexStream<O>> STREAM addOperator(Operator op, Operator.InputPort<T> inputPort, Operator.OutputPort<O> outputPort, Option... opts); /** - * Extend the dag by adding one {@see Operator} - * @param opName Operator name + * Extend the dag by adding one end operator<br> * @param op Operator added to the stream * @param inputPort InputPort of the operator that is connected to last exposed OutputPort in the stream - * @param outputPort OutputPort of the operator will be connected to next operator * @param <O> type of the output * @return new stream of type O */ - <O, STREAM extends ApexStream<O>> STREAM addOperator(String opName, Operator op, Operator.InputPort<T> inputPort, Operator.OutputPort<O> outputPort); + <O, STREAM extends ApexStream<O>> STREAM endWith(Operator op, Operator.InputPort<T> inputPort, Option... opts); + + /** + * Extend the dag by adding one {@see CompositeStreamTransform} + * @param compositeStreamTransform Composite Streams and Transforms + * @return new stream of type O + */ + <O, INSTREAM extends ApexStream<T>, OUTSTREAM extends ApexStream<O>> OUTSTREAM addCompositeStreams(CompositeStreamTransform<INSTREAM, OUTSTREAM> compositeStreamTransform); /** * Union multiple stream into one @@ -260,4 +180,30 @@ public interface ApexStream<T> */ void run(); + /** + * Chunk tuples into Windows + * Window Transform are defined in {@see WindowedStream} + * @param windowOption + * @return + */ + WindowedStream<T> window(WindowOption windowOption); + + /** + * Chunk tuple into windows with window option and trigger option + * @param windowOption + * @param triggerOption + * @return + */ + WindowedStream<T> window(WindowOption windowOption, TriggerOption triggerOption); + + /** + * + * Chunk tuple into windows with window option and trigger option and allowed lateness + * @param windowOption + * @param triggerOption + * @param allowLateness + * @return + */ + WindowedStream<T> window(WindowOption windowOption, TriggerOption triggerOption, Duration allowLateness); + } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/CompositeStreamTransform.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/CompositeStreamTransform.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/CompositeStreamTransform.java new file mode 100644 index 0000000..979f44f --- /dev/null +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/CompositeStreamTransform.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.api; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * A group of Streams and transforms in between + */ [email protected] +public abstract class CompositeStreamTransform<INSTREAM extends ApexStream, OUTSTREAM extends ApexStream> +{ + public abstract OUTSTREAM compose(INSTREAM inputStream); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/Option.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/Option.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/Option.java new file mode 100644 index 0000000..1b8935f --- /dev/null +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/Option.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.api; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.Attribute; + +/** + * Options for the operators in the dag + */ [email protected] +public interface Option +{ + + class Options + { + public static Option name(String name) + { + return new OpName(name); + } + + public static Option prop(String name, Object value) + { + return new PropSetting(name, value); + } + + public static <T> Option attr(Attribute<T> attr, T obj) + { + return new AttributeSetting<>(attr, obj); + } + } + + /** + * An option used to set the name of the operator + */ + class OpName implements Option + { + + private String name; + + public OpName(String name) + { + this.name = name; + } + + public String getName() + { + return name; + } + } + + /** + * An option used to set the property value of the operator + */ + class PropSetting implements Option + { + + private String name; + + private Object val; + + public PropSetting(String name, Object val) + { + this.name = name; + this.val = val; + } + + public String getName() + { + return name; + } + + public Object getVal() + { + return val; + } + } + + /** + * An option used to set the {@link Attribute} + * @param <T> + */ + class AttributeSetting<T> implements Option + { + private Attribute<T> attr; + + private T value; + + public AttributeSetting(Attribute<T> attr, T value) + { + this.attr = attr; + this.value = value; + } + + public Attribute<T> getAttr() + { + return attr; + } + + public T getValue() + { + return value; + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java index 748a76a..bc99035 100644 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java @@ -18,6 +18,22 @@ */ package org.apache.apex.malhar.stream.api; +import java.util.List; + +import org.joda.time.Duration; + +import org.apache.apex.malhar.lib.window.Accumulation; +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl; +import org.apache.apex.malhar.lib.window.impl.WindowedOperatorImpl; +import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.accumulation.FoldFn; +import org.apache.apex.malhar.stream.api.impl.accumulation.ReduceFn; +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.lib.util.KeyValPair; + /** * <p> * A stream with windowed transformation @@ -51,6 +67,140 @@ package org.apache.apex.malhar.stream.api; * * @since 3.4.0 */ [email protected] public interface WindowedStream<T> extends ApexStream<T> { + + /** + * Count of all tuples + * @return new stream of Integer + */ + <STREAM extends WindowedStream<Tuple.WindowedTuple<Long>>> STREAM count(Option... opts); + + /** + * Count tuples by the key<br> + * @param name name of the operator + * @param convertToKeyValue The function convert plain tuple to k,v pair + * @return new stream of Key Value Pair + */ + <K, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, Long>>>> STREAM countByKey(Function.ToKeyValue<T, K, Long> convertToKeyValue, Option... opts); + + /** + * Return top N tuples by the selected key + * @param N how many tuples you want to keep + * @param name name of the operator + * @param convertToKeyVal The function convert plain tuple to k,v pair + * @return new stream of Key and top N tuple of the key + */ + <K, V, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, List<V>>>>> STREAM topByKey(int N, Function.ToKeyValue<T, K, V> convertToKeyVal, Option... opts); + + /** + * Return top N tuples of all tuples in the window + * @param N + * @param name name of the operator + * @return new stream of topN + */ + <STREAM extends WindowedStream<Tuple.WindowedTuple<List<T>>>> STREAM top(int N, Option... opts); + + /** + * Add {@link KeyedWindowedOperatorImpl} with specified {@link Accumulation} <br> + * Accumulate tuples by some key within the window definition in this stream + * Also give a name to the accumulation + * @param accumulation Accumulation function you want to do + * @param convertToKeyVal The function convert plain tuple to k,v pair + * @param <K> The type of the key used to group tuples + * @param <V> The type of value you want to do accumulation on + * @param <O> The output type for each given key that you want to accumulate the value to + * @param <ACCU> The type of accumulation you want to keep (it can be in memory or on disk) + * @param <STREAM> return type + * @return + */ + <K, V, O, ACCU, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, O>>>> STREAM accumulateByKey(Accumulation<V, ACCU, O> accumulation, + Function.ToKeyValue<T, K, V> convertToKeyVal, Option... opts); + + /** + * Add {@link WindowedOperatorImpl} with specified {@link Accumulation} <br> + * Accumulate tuples by some key within the window definition in this stream + * Also give a name to the accumulation + * @param accumulation Accumulation function you want to do + * @param <O> The output type that you want to accumulate the value to + * @param <ACCU> The type of accumulation you want to keep (it can be in memory or on disk) + * @param <STREAM> return type + * @return + */ + <O, ACCU, STREAM extends WindowedStream<Tuple.WindowedTuple<O>>> STREAM accumulate(Accumulation<T, ACCU, O> accumulation, Option... opts); + + /** + * Add {@link WindowedOperatorImpl} with specified {@link ReduceFn} <br> + * Do reduce transformation<br> + * @param reduce reduce function + * @param <STREAM> return type + * @return new stream of same type + */ + <STREAM extends WindowedStream<Tuple.WindowedTuple<T>>> STREAM reduce(ReduceFn<T> reduce, Option... opts); + + /** + * Add {@link KeyedWindowedOperatorImpl} with specified {@link ReduceFn} <br> + * Reduce transformation by selected key <br> + * Add an operator to the DAG which merge tuple t1, t2 to new tuple by key + * @param reduce reduce function + * @param convertToKeyVal The function convert plain tuple to k,v pair + * @param <K> The type of key you want to group tuples by + * @param <V> The type of value extract from tuple T + * @param <STREAM> return type + * @return new stream of key value pair + */ + <K, V, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, V>>>> STREAM reduceByKey(ReduceFn<V> reduce, Function.ToKeyValue<T, K, V> convertToKeyVal, Option... opts); + + + /** + * Add {@link WindowedOperatorImpl} with specified {@link FoldFn} <br> + * Fold transformation <br> + * @param fold fold function + * @param <O> output type of fold function + * @param <STREAM> return type + * @return + */ + <O, STREAM extends WindowedStream<Tuple.WindowedTuple<O>>> STREAM fold(FoldFn<T, O> fold, Option... opts); + + /** + * Add {@link KeyedWindowedOperatorImpl} with specified {@link FoldFn} <br> + * Fold transformation by key <br> + * @param fold fold function + * @param <O> Result type + * @return new stream of type O + */ + <K, V, O, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, O>>>> STREAM foldByKey(FoldFn<V, O> fold, Function.ToKeyValue<T, K, V> convertToKeyVal, Option... opts); + + + /** + * Return tuples for each key for each window + * @param <O> + * @param <K> + * @param <STREAM> + * @return + */ + <O, K, STREAM extends WindowedStream<KeyValPair<K, Iterable<O>>>> STREAM groupByKey(Function.ToKeyValue<T, K, O> convertToKeyVal, Option... opts); + + /** + * Return tuples for each window + * @param <STREAM> + * @return + */ + <STREAM extends WindowedStream<Iterable<T>>> STREAM group(); + + /** + * Reset the trigger settings for next transforms + * @param triggerOption + * @param <STREAM> + */ + <STREAM extends WindowedStream<T>> STREAM resetTrigger(TriggerOption triggerOption); + + /** + * Reset the allowedLateness settings for next transforms + * @param allowedLateness + * @param <STREAM> + */ + <STREAM extends WindowedStream<T>> STREAM resetAllowedLateness(Duration allowedLateness); + } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/function/Function.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/function/Function.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/function/Function.java index f4e5e60..d516064 100644 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/function/Function.java +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/function/Function.java @@ -18,11 +18,24 @@ */ package org.apache.apex.malhar.stream.api.function; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.stream.api.operator.FunctionOperator; +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.lib.util.KeyValPair; + /** - * The top level function interface + * The top level function interface <br> + * The function is wrapped by {@link FunctionOperator} <br> + * It takes input from input port of {@link FunctionOperator} ex. {@link FunctionOperator.MapFunctionOperator#input} <br> + * And the output will be emitted using {@link FunctionOperator#tupleOutput} <br> + * Anonymous function is not fully supported. It must be <b>stateless</b> should not be defined in any static context<br> + * If anonymous function does not working, you can should use top level function class<br> + * Top level function class should have public non-arg constructor * * @since 3.4.0 */ [email protected] public interface Function { /** @@ -45,26 +58,18 @@ public interface Function } /** - * An interface defines a reduce transformation + * A special map function to convert any pojo to key value pair datastructure * @param <T> + * @param <K> + * @param <V> */ - public static interface ReduceFunction<T> extends Function + public static interface ToKeyValue<T, K, V> extends MapFunction<T, Tuple<KeyValPair<K, V>>> { - T reduce(T t1, T t2); - } - /** - * An interface that defines a fold transformation - * @param <I> - * @param <O> - */ - public static interface FoldFunction<I, O> extends Function - { - O fold(I input, O output); } /** - * An interface that defines flatmap transforation + * An interface that defines flatmap transformation * @param <I> * @param <O> */ @@ -76,7 +81,8 @@ public interface Function * An interface that defines filter transformation * @param <T> */ - public static interface FilterFunction<T> extends MapFunction<T, Boolean> + public static interface FilterFunction<T> extends Function { + boolean f(T input); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java index 2ff6d51..032cb03 100644 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java @@ -27,28 +27,39 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; +import org.joda.time.Duration; + +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.WindowOption; + import org.apache.apex.malhar.stream.api.ApexStream; +import org.apache.apex.malhar.stream.api.CompositeStreamTransform; +import org.apache.apex.malhar.stream.api.Option; +import org.apache.apex.malhar.stream.api.WindowedStream; import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.function.Function.FlatMapFunction; import org.apache.apex.malhar.stream.api.operator.FunctionOperator; import org.apache.commons.beanutils.BeanUtils; import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.classification.InterfaceStability; import com.datatorrent.api.Attribute; import com.datatorrent.api.Context; import com.datatorrent.api.DAG; import com.datatorrent.api.LocalMode; import com.datatorrent.api.Operator; -import com.datatorrent.lib.algo.UniqueCounter; import com.datatorrent.lib.io.ConsoleOutputOperator; import com.datatorrent.stram.StramLocalCluster; import com.datatorrent.stram.plan.logical.LogicalPlan; /** - * Default stream implementation for ApexStream interface. - * It creates the dag(execution plan) from stream api + * Default stream implementation for ApexStream interface. <br> + * It creates the dag(execution plan) from stream api <br> + * The dag won't be constructed until {@link #populateDag(DAG)} is called * * @since 3.4.0 */ [email protected] public class ApexStreamImpl<T> implements ApexStream<T> { @@ -135,18 +146,17 @@ public class ApexStreamImpl<T> implements ApexStream<T> } } + /** * Graph behind the stream */ - private DagMeta graph; - - private ApexStream<T> delegator; + protected DagMeta graph; /** - * Right now the stream only support single extend point - * You can have multiple downstream operators connect to this single extend point though + * Right now the stream only support single extension point + * You can have multiple downstream operators connect to this single extension point though */ - private Brick<T> lastBrick; + protected Brick<T> lastBrick; public Brick<T> getLastBrick() { @@ -163,13 +173,11 @@ public class ApexStreamImpl<T> implements ApexStream<T> graph = new DagMeta(); } - public ApexStreamImpl(ApexStream<T> apexStream) + public ApexStreamImpl(ApexStreamImpl<T> apexStream) { - this.delegator = apexStream; - if (delegator != null && delegator instanceof ApexStreamImpl) { - graph = ((ApexStreamImpl)delegator).graph; - lastBrick = ((ApexStreamImpl<T>)delegator).lastBrick; - } + //copy the variables over to the new ApexStreamImpl + graph = apexStream.graph; + lastBrick = apexStream.lastBrick; } public ApexStreamImpl(DagMeta graph) @@ -184,128 +192,52 @@ public class ApexStreamImpl<T> implements ApexStream<T> } @Override - public <O, STREAM extends ApexStream<O>> STREAM map(Function.MapFunction<T, O> mf) - { - return map(mf.toString(), mf); - } - - @Override - @SuppressWarnings("unchecked") - public <O, STREAM extends ApexStream<O>> STREAM map(String name, Function.MapFunction<T, O> mf) + public <O, STREAM extends ApexStream<O>> STREAM map(Function.MapFunction<T, O> mf, Option... opts) { FunctionOperator.MapFunctionOperator<T, O> opt = new FunctionOperator.MapFunctionOperator<>(mf); - return (STREAM)addOperator(name, opt, opt.input, opt.output); + return addOperator(opt, opt.input, opt.output, opts); } - @Override - public <O, STREAM extends ApexStream<O>> STREAM flatMap(Function.FlatMapFunction<T, O> flatten) - { - return flatMap(flatten.toString(), flatten); - } @Override - @SuppressWarnings("unchecked") - public <O, STREAM extends ApexStream<O>> STREAM flatMap(String name, Function.FlatMapFunction<T, O> flatten) + public <O, STREAM extends ApexStream<O>> STREAM flatMap(FlatMapFunction<T, O> flatten, Option... opts) { FunctionOperator.FlatMapFunctionOperator<T, O> opt = new FunctionOperator.FlatMapFunctionOperator<>(flatten); - return (STREAM)addOperator(name, opt, opt.input, opt.output); - } - - @Override - public <STREAM extends ApexStream<T>> STREAM filter(final Function.FilterFunction<T> filter) - { - return filter(filter.toString(), filter); + return addOperator(opt, opt.input, opt.output, opts); } @Override @SuppressWarnings("unchecked") - public <STREAM extends ApexStream<T>> STREAM filter(String name, final Function.FilterFunction<T> filter) + public <STREAM extends ApexStream<T>> STREAM filter(final Function.FilterFunction<T> filter, Option... opts) { FunctionOperator.FilterFunctionOperator<T> filterFunctionOperator = new FunctionOperator.FilterFunctionOperator<>(filter); - return (STREAM)addOperator(name, filterFunctionOperator, filterFunctionOperator.input, filterFunctionOperator.output); - } - - @Override - public <STREAM extends ApexStream<T>> STREAM reduce(Function.ReduceFunction<T> reduce) - { - return reduce(reduce.toString(), reduce); - } - - @Override - @SuppressWarnings("unchecked") - public <STREAM extends ApexStream<T>> STREAM reduce(String name, Function.ReduceFunction<T> reduce) - { - FunctionOperator.ReduceFunctionOperator<T> opt = new FunctionOperator.ReduceFunctionOperator<>(reduce); - return (STREAM)addOperator(name, opt, opt.input, opt.output); - } - - @Override - public <O, STREAM extends ApexStream<O>> STREAM fold(final O initialValue, Function.FoldFunction<T, O> fold) - { - return fold(fold.toString(), initialValue, fold); - } - - @Override - @SuppressWarnings("unchecked") - public <O, STREAM extends ApexStream<O>> STREAM fold(String name, O initialValue, Function.FoldFunction<T, O> fold) - { - FunctionOperator.FoldFunctionOperator<T, O> opt = new FunctionOperator.FoldFunctionOperator<>(fold, initialValue); - return (STREAM)addOperator(name, opt, opt.input, opt.output); - } - - @Override - public <STREAM extends ApexStream<Integer>> STREAM count() - { - throw new UnsupportedOperationException(); - } - - @Override - public <STREAM extends ApexStream<Map<Object, Integer>>> STREAM countByKey(int key) - { - throw new UnsupportedOperationException(); + return addOperator(filterFunctionOperator, filterFunctionOperator.input, filterFunctionOperator.output, opts); } - @Override - @SuppressWarnings("unchecked") - public <STREAM extends ApexStream<Map<Object, Integer>>> STREAM countByKey() + public <STREAM extends ApexStream<Map.Entry<Object, Integer>>> STREAM countByElement() { - // Needs to change the unique counter to support keys - UniqueCounter<Object> uniqueCounter = new UniqueCounter<>(); - uniqueCounter.setCumulative(true); - Operator.OutputPort<? extends Map<Object, Integer>> resultPort = uniqueCounter.count; - return (STREAM)addOperator("CounterByKey", uniqueCounter, (Operator.InputPort<T>)uniqueCounter.data, resultPort); + return null; } @Override - public <O, STREAM extends ApexStream<O>> STREAM addOperator(Operator op, Operator.InputPort<T> inputPort, Operator.OutputPort<O> outputPort) + public <O, STREAM extends ApexStream<O>> STREAM endWith(Operator op, Operator.InputPort<T> inputPort, Option... opts) { - return addOperator(op.toString(), op, inputPort, outputPort); + return (STREAM)addOperator(op, inputPort, null, opts); } - @Override @SuppressWarnings("unchecked") - public <O, STREAM extends ApexStream<O>> STREAM addOperator(String opName, Operator op, Operator.InputPort<T> inputPort, Operator.OutputPort<O> outputPort) + public <O, STREAM extends ApexStream<O>> STREAM addOperator(Operator op, Operator.InputPort<T> inputPort, Operator.OutputPort<O> outputPort, Option... opts) { - if (delegator != null) { - ApexStreamImpl<O> apexStream = delegator.addOperator(opName, op, inputPort, outputPort); - try { - return (STREAM)this.getClass().getConstructor(ApexStream.class).newInstance(apexStream); - } catch (Exception e) { - throw new RuntimeException("You have to override the default constructor with ApexStreamImpl as delegator"); - } - } - checkArguments(op, inputPort, outputPort); DagMeta.NodeMeta nm = null; if (lastBrick == null) { - nm = graph.addNode(opName, op, null, null, inputPort); + nm = graph.addNode(op, null, null, inputPort, opts); } else { - - nm = graph.addNode(opName, op, lastBrick.nodeMeta, lastBrick.lastOutput, inputPort); + nm = graph.addNode(op, lastBrick.nodeMeta, lastBrick.lastOutput, inputPort, opts); } Brick<O> newBrick = new Brick<>(); @@ -315,9 +247,25 @@ public class ApexStreamImpl<T> implements ApexStream<T> newBrick.lastStream = Pair.<Operator.OutputPort, Operator.InputPort>of(lastBrick.lastOutput, inputPort); } - return (STREAM)new ApexStreamImpl<>(this.graph, newBrick); + if (this.getClass() == ApexStreamImpl.class || this.getClass() == ApexWindowedStreamImpl.class) { + return (STREAM)newStream(this.graph, newBrick); + } else { + try { + return (STREAM)this.getClass().getConstructor(ApexStreamImpl.class).newInstance(newStream(this.graph, newBrick)); + } catch (Exception e) { + throw new RuntimeException("You have to override the default constructor with ApexStreamImpl as default parameter", e); + } + } + + } + + @Override + public <O, INSTREAM extends ApexStream<T>, OUTSTREAM extends ApexStream<O>> OUTSTREAM addCompositeStreams(CompositeStreamTransform<INSTREAM, OUTSTREAM> compositeStreamTransform) + { + return compositeStreamTransform.compose((INSTREAM)this); } + /* Check to see if inputPort and outputPort belongs to the operator */ private void checkArguments(Operator op, Operator.InputPort inputPort, Operator.OutputPort outputPort) { @@ -362,8 +310,8 @@ public class ApexStreamImpl<T> implements ApexStream<T> public ApexStreamImpl<T> print() { ConsoleOutputOperator consoleOutputOperator = new ConsoleOutputOperator(); - addOperator(IDGenerator.generateOperatorIDWithUUID(consoleOutputOperator.getClass()), consoleOutputOperator, - (Operator.InputPort<T>)consoleOutputOperator.input, null); + addOperator(consoleOutputOperator, + (Operator.InputPort<T>)consoleOutputOperator.input, null, Option.Options.name(IDGenerator.generateOperatorIDWithUUID(consoleOutputOperator.getClass()))); return this; } @@ -469,6 +417,7 @@ public class ApexStreamImpl<T> implements ApexStream<T> { LocalMode lma = LocalMode.newInstance(); populateDag(lma.getDAG()); + DAG dag = lma.getDAG(); LocalMode.Controller lc = lma.getController(); if (lc instanceof StramLocalCluster) { ((StramLocalCluster)lc).setExitCondition(exitCondition); @@ -493,5 +442,36 @@ public class ApexStreamImpl<T> implements ApexStream<T> //TODO need an api to submit the StreamingApplication to cluster } + @Override + public WindowedStream<T> window(WindowOption option) + { + return window(option, null, null); + } + + @Override + public WindowedStream<T> window(WindowOption windowOption, TriggerOption triggerOption) + { + return window(windowOption, triggerOption, null); + } + + @Override + public WindowedStream<T> window(WindowOption windowOption, TriggerOption triggerOption, Duration allowLateness) + { + ApexWindowedStreamImpl<T> windowedStream = new ApexWindowedStreamImpl<>(); + windowedStream.lastBrick = lastBrick; + windowedStream.graph = graph; + windowedStream.windowOption = windowOption; + windowedStream.triggerOption = triggerOption; + windowedStream.allowedLateness = allowLateness; + return windowedStream; + } + + protected <O> ApexStream<O> newStream(DagMeta graph, Brick<O> newBrick) + { + ApexStreamImpl<O> newstream = new ApexStreamImpl<>(); + newstream.graph = graph; + newstream.lastBrick = newBrick; + return newstream; + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java new file mode 100644 index 0000000..a293ea8 --- /dev/null +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java @@ -0,0 +1,275 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.api.impl; + +import java.util.List; + +import org.joda.time.Duration; + +import org.apache.apex.malhar.lib.window.Accumulation; +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.lib.window.WindowState; + +import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage; +import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage; +import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl; +import org.apache.apex.malhar.lib.window.impl.WindowedOperatorImpl; +import org.apache.apex.malhar.stream.api.ApexStream; +import org.apache.apex.malhar.stream.api.Option; +import org.apache.apex.malhar.stream.api.WindowedStream; +import org.apache.apex.malhar.stream.api.function.Function; + +import org.apache.apex.malhar.stream.api.impl.accumulation.Count; +import org.apache.apex.malhar.stream.api.impl.accumulation.FoldFn; +import org.apache.apex.malhar.stream.api.impl.accumulation.ReduceFn; +import org.apache.apex.malhar.stream.api.impl.accumulation.TopN; +import org.apache.commons.lang3.mutable.MutableLong; +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.lib.util.KeyValPair; + +/** + * Default windowed stream implementation for WindowedStream interface. + * It adds more windowed transform for Stream interface + * + * @since 3.4.0 + */ [email protected] +public class ApexWindowedStreamImpl<T> extends ApexStreamImpl<T> implements WindowedStream<T> +{ + + protected WindowOption windowOption; + + protected TriggerOption triggerOption; + + protected Duration allowedLateness; + + private class ConvertFn<T> implements Function.MapFunction<T, Tuple<T>> + { + + @Override + public Tuple<T> f(T input) + { + if (input instanceof Tuple.TimestampedTuple) { + return (Tuple.TimestampedTuple)input; + } else { + return new Tuple.TimestampedTuple<>(System.currentTimeMillis(), input); + } + } + } + + + public ApexWindowedStreamImpl() + { + } + + @Override + public <STREAM extends WindowedStream<Tuple.WindowedTuple<Long>>> STREAM count(Option... opts) + { + Function.MapFunction<T, Tuple<Long>> kVMap = new Function.MapFunction<T, Tuple<Long>>() + { + @Override + public Tuple<Long> f(T input) + { + if (input instanceof Tuple.TimestampedTuple) { + return new Tuple.TimestampedTuple<>(((Tuple.TimestampedTuple)input).getTimestamp(), 1L); + } else { + return new Tuple.TimestampedTuple<>(System.currentTimeMillis(), 1L); + } + } + }; + + WindowedStream<Tuple<Long>> innerstream = map(kVMap); + WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createWindowedOperator(new Count()); + return innerstream.addOperator(windowedOperator, windowedOperator.input, windowedOperator.output, opts); + } + + @Override + public <K, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, Long>>>> STREAM countByKey(Function.ToKeyValue<T, K, Long> convertToKeyValue, Option... opts) + { + WindowedStream<Tuple<KeyValPair<K, Long>>> kvstream = map(convertToKeyValue); + KeyedWindowedOperatorImpl<K, Long, MutableLong, Long> keyedWindowedOperator = createKeyedWindowedOperator(new Count()); + return kvstream.addOperator(keyedWindowedOperator, keyedWindowedOperator.input, keyedWindowedOperator.output, opts); + } + + @Override + public <K, V, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, List<V>>>>> STREAM topByKey(int N, Function.ToKeyValue<T, K, V> convertToKeyVal, Option... opts) + { + TopN<V> top = new TopN<>(); + top.setN(N); + WindowedStream<Tuple<KeyValPair<K, V>>> kvstream = map(convertToKeyVal); + KeyedWindowedOperatorImpl<K, V, List<V>, List<V>> keyedWindowedOperator = createKeyedWindowedOperator(top); + return kvstream.addOperator(keyedWindowedOperator, keyedWindowedOperator.input, keyedWindowedOperator.output, opts); + } + + @Override + public <STREAM extends WindowedStream<Tuple.WindowedTuple<List<T>>>> STREAM top(int N, Option... opts) + { + + TopN<T> top = new TopN<>(); + top.setN(N); + WindowedStream<Tuple<T>> innerstream = map(new ConvertFn<T>()); + WindowedOperatorImpl<T, List<T>, List<T>> windowedOperator = createWindowedOperator(top); + return innerstream.addOperator(windowedOperator, windowedOperator.input, windowedOperator.output, opts); + } + + + @Override + public <K, V, O, ACCU, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, O>>>> STREAM accumulateByKey(Accumulation<V, ACCU, O> accumulation, + Function.ToKeyValue<T, K, V> convertToKeyVal, Option... opts) + { + WindowedStream<Tuple<KeyValPair<K, V>>> kvstream = map(convertToKeyVal); + KeyedWindowedOperatorImpl<K, V, ACCU, O> keyedWindowedOperator = createKeyedWindowedOperator(accumulation); + return kvstream.addOperator(keyedWindowedOperator, keyedWindowedOperator.input, keyedWindowedOperator.output, opts); + } + + + @Override + public <O, ACCU, STREAM extends WindowedStream<Tuple.WindowedTuple<O>>> STREAM accumulate(Accumulation<T, ACCU, O> accumulation, Option... opts) + { + WindowedStream<Tuple<T>> innerstream = map(new ConvertFn<T>()); + WindowedOperatorImpl<T, ACCU, O> windowedOperator = createWindowedOperator(accumulation); + return innerstream.addOperator(windowedOperator, windowedOperator.input, windowedOperator.output, opts); + } + + + @Override + public <STREAM extends WindowedStream<Tuple.WindowedTuple<T>>> STREAM reduce(ReduceFn<T> reduce, Option... opts) + { + WindowedStream<Tuple<T>> innerstream = map(new ConvertFn<T>()); + WindowedOperatorImpl<T, T, T> windowedOperator = createWindowedOperator(reduce); + return innerstream.addOperator(windowedOperator, windowedOperator.input, windowedOperator.output, opts); + } + + @Override + public <K, V, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, V>>>> STREAM reduceByKey(ReduceFn<V> reduce, Function.ToKeyValue<T, K, V> convertToKeyVal, Option... opts) + { + WindowedStream<Tuple<KeyValPair<K, V>>> kvstream = map(convertToKeyVal); + KeyedWindowedOperatorImpl<K, V, V, V> keyedWindowedOperator = createKeyedWindowedOperator(reduce); + return kvstream.addOperator(keyedWindowedOperator, keyedWindowedOperator.input, keyedWindowedOperator.output, opts); + } + + @Override + public <O, STREAM extends WindowedStream<Tuple.WindowedTuple<O>>> STREAM fold(FoldFn<T, O> fold, Option... opts) + { + WindowedStream<Tuple<T>> innerstream = map(new ConvertFn<T>()); + WindowedOperatorImpl<T, O, O> windowedOperator = createWindowedOperator(fold); + return innerstream.addOperator(windowedOperator, windowedOperator.input, windowedOperator.output, opts); + } + + @Override + public <K, V, O, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, O>>>> STREAM foldByKey(FoldFn<V, O> fold, Function.ToKeyValue<T, K, V> convertToKeyVal, Option... opts) + { + WindowedStream<Tuple<KeyValPair<K, V>>> kvstream = map(convertToKeyVal); + KeyedWindowedOperatorImpl<K, V, O, O> keyedWindowedOperator = createKeyedWindowedOperator(fold); + return kvstream.addOperator(keyedWindowedOperator, keyedWindowedOperator.input, keyedWindowedOperator.output, opts); + + } + + @Override + public <O, K, STREAM extends WindowedStream<KeyValPair<K, Iterable<O>>>> STREAM groupByKey(Function.ToKeyValue<T, K, O> convertToKeyVal, Option... opts) + { + throw new UnsupportedOperationException(); + } + + @Override + public <STREAM extends WindowedStream<Iterable<T>>> STREAM group() + { + throw new UnsupportedOperationException(); + } + + @Override + public <STREAM extends WindowedStream<T>> STREAM resetTrigger(TriggerOption option) + { + triggerOption = option; + return (STREAM)this; + } + + @Override + public <STREAM extends WindowedStream<T>> STREAM resetAllowedLateness(Duration allowedLateness) + { + this.allowedLateness = allowedLateness; + return (STREAM)this; + } + + @Override + protected <O> ApexStream<O> newStream(DagMeta graph, Brick<O> newBrick) + { + ApexWindowedStreamImpl<O> newstream = new ApexWindowedStreamImpl<>(); + newstream.graph = graph; + newstream.lastBrick = newBrick; + newstream.windowOption = this.windowOption; + newstream.triggerOption = this.triggerOption; + newstream.allowedLateness = this.allowedLateness; + return newstream; + } + + /** + * Create the windowed operator for windowed transformation + * @param accumulationFn + * @param <IN> + * @param <ACCU> + * @param <OUT> + * @return + */ + private <IN, ACCU, OUT> WindowedOperatorImpl<IN, ACCU, OUT> createWindowedOperator(Accumulation<IN, ACCU, OUT> accumulationFn) + { + WindowedOperatorImpl<IN, ACCU, OUT> windowedOperator = new WindowedOperatorImpl<>(); + //TODO use other default setting in the future + windowedOperator.setDataStorage(new InMemoryWindowedStorage<ACCU>()); + windowedOperator.setRetractionStorage(new InMemoryWindowedStorage<OUT>()); + windowedOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>()); + if (windowOption != null) { + windowedOperator.setWindowOption(windowOption); + } + if (triggerOption != null) { + windowedOperator.setTriggerOption(triggerOption); + } + if (allowedLateness != null) { + windowedOperator.setAllowedLateness(allowedLateness); + } + windowedOperator.setAccumulation(accumulationFn); + return windowedOperator; + } + + private <K, V, ACCU, OUT> KeyedWindowedOperatorImpl<K, V, ACCU, OUT> createKeyedWindowedOperator(Accumulation<V, ACCU, OUT> accumulationFn) + { + KeyedWindowedOperatorImpl<K, V, ACCU, OUT> keyedWindowedOperator = new KeyedWindowedOperatorImpl<>(); + + //TODO use other default setting in the future + keyedWindowedOperator.setDataStorage(new InMemoryWindowedKeyedStorage<K, ACCU>()); + keyedWindowedOperator.setRetractionStorage(new InMemoryWindowedKeyedStorage<K, OUT>()); + keyedWindowedOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>()); + if (windowOption != null) { + keyedWindowedOperator.setWindowOption(windowOption); + } + if (triggerOption != null) { + keyedWindowedOperator.setTriggerOption(triggerOption); + } + if (allowedLateness != null) { + keyedWindowedOperator.setAllowedLateness(allowedLateness); + } + + keyedWindowedOperator.setAccumulation(accumulationFn); + return keyedWindowedOperator; + } + +}
