Repository: flink Updated Branches: refs/heads/master 65ce51c24 -> 6abbba248
[FLINK-6198] [cep] [doc] Update CEP documentation. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6abbba24 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6abbba24 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6abbba24 Branch: refs/heads/master Commit: 6abbba248876379d388bca42f387adde1518f198 Parents: cf05bcb Author: David Anderson <[email protected]> Authored: Wed Jun 7 14:23:51 2017 +0200 Committer: kkloudas <[email protected]> Committed: Mon Jun 12 10:53:18 2017 +0200 ---------------------------------------------------------------------- docs/dev/libs/cep.md | 260 +++++++++++++++++++++++----------------------- 1 file changed, 132 insertions(+), 128 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6abbba24/docs/dev/libs/cep.md ---------------------------------------------------------------------- diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md index 6a2fb53..ab0e23d 100644 --- a/docs/dev/libs/cep.md +++ b/docs/dev/libs/cep.md @@ -69,8 +69,8 @@ See linking with it for cluster execution [here]({{site.baseurl}}/dev/linking.ht Now you can start writing your first CEP program using the Pattern API. -<span class="label label-danger">Attention</span> The events in the `DataStream` to which -you want to apply pattern matching have to implement proper `equals()` and `hashCode()` methods +{% warn Attention %} The events in the `DataStream` to which +you want to apply pattern matching must implement proper `equals()` and `hashCode()` methods because these are used for comparing and matching events. <div class="codetabs" markdown="1"> @@ -135,17 +135,17 @@ from your input stream. Each such complex pattern sequence consists of multiple simple patterns, i.e. patterns looking for individual events with the same properties. From now on, these simple patterns will be called **patterns**, and -the final complex pattern sequence we are searching in the stream, **pattern sequence**. A pattern sequence +the final complex pattern sequence we are searching for in the stream, the **pattern sequence**. A pattern sequence can be seen as a graph of such patterns, where transitions from one pattern to the next occur based on user-specified -*conditions*, e.g. `event.getName().equals("start")`. A *match* is a sequence of input events which visits all +*conditions*, e.g. `event.getName().equals("start")`. A **match** is a sequence of input events which visits all patterns of the complex pattern graph, through a sequence of valid pattern transitions. -<span class="label label-danger">Attention</span> Each pattern must have a unique name to identify the matched -events later on. +{% warn Attention %} Each pattern must have a unique name, which is used to later +identify the matched events. -<span class="label label-danger">Attention</span> Pattern names **CANNOT** contain the character `":"`. +{% warn Attention %} Pattern names **CANNOT** contain the character `":"`. -In the remainder, we start by describing how to define [Patterns](#individual-patterns), before describing how you can +In the remainder of this section we will first describe how to define [Individual Patterns](#individual-patterns), and then cover how you can combine individual patterns into [Complex Patterns](#combining-patterns). ### Individual Patterns @@ -154,13 +154,13 @@ A **Pattern** can be either a *singleton* pattern, or a *looping* one. Singleton event, while looping ones can accept more than one. In pattern matching symbols, in the pattern `"a b+ c? d"` (or `"a"`, followed by *one or more* `"b"`'s, optionally followed by a `"c"`, followed by a `"d"`), `a`, `c?`, and `d` are singleton patterns, while `b+` is a looping one. By default, a pattern is a singleton pattern and you can transform -it to a looping one using [Quantifiers](#quantifiers). In addition, each pattern can have one or more +it to a looping one by using [Quantifiers](#quantifiers). In addition, each pattern can have one or more [Conditions](#conditions) based on which it accepts events. #### Quantifiers -In FlinkCEP, looping patterns can be specified using the methods: `pattern.oneOrMore()`, for patterns that expect one or -more occurrences of a given event (e.g. the `b+` mentioned previously), and `pattern.times(#ofTimes)` for patterns that +In FlinkCEP, looping patterns can be specified using these methods: `pattern.oneOrMore()`, for patterns that expect one or +more occurrences of a given event (e.g. the `b+` mentioned previously); and `pattern.times(#ofTimes)`, for patterns that expect a specific number of occurrences of a given type of event, e.g. 4 `a`'s. All patterns, looping or not, can be made optional using the `pattern.optional()` method. For a pattern named `start`, the following are valid quantifiers: @@ -257,7 +257,7 @@ middle.oneOrMore().where( </div> </div> -<span class="label label-danger">Attention</span> The call to `context.getEventsForPattern(...)` finds all the +{% warn Attention %} The call to `context.getEventsForPattern(...)` finds all the previously accepted events for a given potential match. The cost of this operation can vary, so when implementing your condition, try to minimize its use. @@ -308,7 +308,7 @@ start.subtype(classOf[SubEvent]).where(subEvent => ... /* some condition */) **Combining Conditions:** As shown, the `subtype` condition can be combined with additional conditions. In fact, this holds for every condition. You can arbitrarily combine conditions by sequentially calling `where()`. The final result will be the logical **AND** of the results of the individual conditions. In -order to combine conditions using **OR**, you can call the `or()` method, as shown below. +order to combine conditions using **OR**, you can use the `or()` method, as shown below. <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> @@ -353,12 +353,18 @@ input `"a1", "c", "a2", "b"` will have the following results: 2. Relaxed Contiguity: `{a1 b}` and `{a1 a2 b}` -- `c` is simply ignored. - 3. Non-Deterministic Relaxed Contiguity: `{a1 b}`, `{a2 b}` and `{a1 a2 b}`. + 3. Non-Deterministic Relaxed Contiguity: `{a1 b}`, `{a2 b}`, and `{a1 a2 b}`. For looping patterns (e.g. `oneOrMore()` and `times()`) the default is *relaxed contiguity*. If you want strict contiguity, you have to explicitly specify it by using the `consecutive()` call, and if you want *non-deterministic relaxed contiguity* you can use the `allowCombinations()` call. +{% warn Attention %} +In this section we are talking about contiguity *within* a single looping pattern, and the +`consecutive()` and `allowCombinations()` calls need to be understood in that context. Later when looking at +[Combining Patterns](#combining-patterns) we'll discuss other calls, such as `next()` and `followedBy()`, +that are used to specify contiguity conditions *between* patterns. + <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> <table class="table table-bordered"> @@ -373,7 +379,7 @@ strict contiguity, you have to explicitly specify it by using the `consecutive() <td><strong>where(condition)</strong></td> <td> <p>Defines a condition for the current pattern. To match the pattern, an event must satisfy the condition. - Multiple consecutive where() clauses lead to their condtions being ANDed:</p> + Multiple consecutive where() clauses lead to their conditions being ANDed:</p> {% highlight java %} pattern.where(new IterativeCondition<Event>() { @Override @@ -418,19 +424,19 @@ pattern.subtype(SubEvent.class); <td><strong>oneOrMore()</strong></td> <td> <p>Specifies that this pattern expects at least one occurrence of a matching event.</p> - <p>By default a relaxed internal contiguity (between subsequent events) is used. For more info on the - internal contiguity see <a href="#consecutive_java">consecutive</a></p> - {% highlight java %} - pattern.oneOrMore(); - {% endhighlight %} + <p>By default a relaxed internal contiguity (between subsequent events) is used. For more info on + internal contiguity see <a href="#consecutive_java">consecutive</a>.</p> +{% highlight java %} +pattern.oneOrMore(); +{% endhighlight %} </td> </tr> <tr> <td><strong>times(#ofTimes)</strong></td> <td> <p>Specifies that this pattern expects an exact number of occurrences of a matching event.</p> - <p>By default a relaxed internal contiguity (between subsequent events) is used. For more info on the - internal contiguity see <a href="#consecutive_java">consecutive</a></p> + <p>By default a relaxed internal contiguity (between subsequent events) is used. For more info on + internal contiguity see <a href="#consecutive_java">consecutive</a>.</p> {% highlight java %} pattern.times(2); {% endhighlight %} @@ -441,39 +447,39 @@ pattern.times(2); <td> <p>Specifies that this pattern is optional, i.e. it may not occur at all. This is applicable to all aforementioned quantifiers.</p> - {% highlight java %} - pattern.oneOrMore().optional(); - {% endhighlight %} +{% highlight java %} +pattern.oneOrMore().optional(); +{% endhighlight %} </td> </tr> <tr> <td><strong>consecutive()</strong><a name="consecutive_java"></a></td> <td> - <p>Works in conjunction with oneOrMore() and times() and imposes strict contiguity between the matching - events, i.e. any non-matching element breaks the match (as in next()).</p> - <p>If not applied a relaxed contiguity (as in followedBy()) is used.</p> + <p>Works in conjunction with <code>oneOrMore()</code> and <code>times()</code> and imposes strict contiguity between the matching + events, i.e. any non-matching element breaks the match (as in <code>next()</code>).</p> + <p>If not applied a relaxed contiguity (as in <code>followedBy()</code>) is used.</p> <p>E.g. a pattern like:</p> - {% highlight java %} - Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("c"); - } - }) - .followedBy("middle").where(new SimpleCondition<Event>() { - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("a"); - } - }).oneOrMore().consecutive() - .followedBy("end1").where(new SimpleCondition<Event>() { - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("b"); - } - }); - {% endhighlight %} +{% highlight java %} +Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } +}) +.followedBy("middle").where(new SimpleCondition<Event>() { + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } +}).oneOrMore().consecutive() +.followedBy("end1").where(new SimpleCondition<Event>() { + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } +}); +{% endhighlight %} <p>Will generate the following matches for an input sequence: C D A1 A2 A3 D A4 B</p> <p>with consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}</p> @@ -483,31 +489,31 @@ pattern.times(2); <tr> <td><strong>allowCombinations()</strong><a name="allow_comb_java"></a></td> <td> - <p>Works in conjunction with oneOrMore() and times() and imposes non-deterministic relaxed contiguity - between the matching events (as in followedByAny()).</p> - <p>If not applied a relaxed contiguity (as in followedBy) is used.</p> + <p>Works in conjunction with <code>oneOrMore()</code> and <code>times()</code> and imposes non-deterministic relaxed contiguity + between the matching events (as in <code>followedByAny()</code>).</p> + <p>If not applied a relaxed contiguity (as in <code>followedBy()</code>) is used.</p> <p>E.g. a pattern like:</p> - {% highlight java %} - Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("c"); - } - }) - .followedBy("middle").where(new SimpleCondition<Event>() { - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("a"); - } - }).oneOrMore().allowCombinations() - .followedBy("end1").where(new SimpleCondition<Event>() { - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("b"); - } - }); - {% endhighlight %} +{% highlight java %} +Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } +}) +.followedBy("middle").where(new SimpleCondition<Event>() { + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } +}).oneOrMore().allowCombinations() +.followedBy("end1").where(new SimpleCondition<Event>() { + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } +}); +{% endhighlight %} <p>Will generate the following matches for an input sequence: C D A1 A2 A3 D A4 B</p> <p>with combinations enabled: {C A1 B}, {C A1 A2 B}, {C A1 A3 B}, {C A1 A4 B}, {C A1 A2 A3 B}, {C A1 A2 A4 B}, {C A1 A3 A4 B}, {C A1 A2 A3 A4 B}</p> @@ -525,14 +531,14 @@ pattern.times(2); <th class="text-left" style="width: 25%">Pattern Operation</th> <th class="text-center">Description</th> </tr> - </thead> + </thead> <tbody> <tr> <td><strong>where(condition)</strong></td> <td> <p>Defines a condition for the current pattern. To match the pattern, an event must satisfy the condition. - Multiple consecutive where() clauses lead to their condtions being ANDed:</p> + Multiple consecutive where() clauses lead to their conditions being ANDed:</p> {% highlight scala %} pattern.where(event => ... /* some condition */) {% endhighlight %} @@ -563,11 +569,11 @@ pattern.subtype(classOf[SubEvent]) <td><strong>oneOrMore()</strong></td> <td> <p>Specifies that this pattern expects at least one occurrence of a matching event.</p> - <p>By default a relaxed internal contiguity (between subsequent events) is used. For more info on the - internal contiguity see <a href="#consecutive_scala">consecutive</a></p> - {% highlight scala %} - pattern.oneOrMore() - {% endhighlight %} + <p>By default a relaxed internal contiguity (between subsequent events) is used. For more info on + internal contiguity see <a href="#consecutive_scala">consecutive</a>.</p> +{% highlight scala %} +pattern.oneOrMore() +{% endhighlight %} </td> </tr> <tr> @@ -575,10 +581,10 @@ pattern.subtype(classOf[SubEvent]) <td> <p>Specifies that this pattern expects an exact number of occurrences of a matching event.</p> <p>By default a relaxed internal contiguity (between subsequent events) is used. - For more info on the internal contiguity see <a href="#consecutive_scala">consecutive</a></p> - {% highlight scala %} - pattern.times(2) - {% endhighlight %} + For more info on internal contiguity see <a href="#consecutive_scala">consecutive</a>.</p> +{% highlight scala %} +pattern.times(2) +{% endhighlight %} </td> </tr> <tr> @@ -586,25 +592,25 @@ pattern.subtype(classOf[SubEvent]) <td> <p>Specifies that this pattern is optional, i.e. it may not occur at all. This is applicable to all aforementioned quantifiers.</p> - {% highlight scala %} - pattern.oneOrMore().optional() - {% endhighlight %} +{% highlight scala %} +pattern.oneOrMore().optional() +{% endhighlight %} </td> </tr> <tr> <td><strong>consecutive()</strong><a name="consecutive_scala"></a></td> <td> - <p>Works in conjunction with oneOrMore() and times() and imposes strict contiguity between the matching - events, i.e. any non-matching element breaks the match (as in next()).</p> - <p>If not applied a relaxed contiguity (as in followedBy()) is used.</p> + <p>Works in conjunction with <code>oneOrMore()</code> and <code>times()</code> and imposes strict contiguity between the matching + events, i.e. any non-matching element breaks the match (as in <code>next()</code>).</p> + <p>If not applied a relaxed contiguity (as in <code>followedBy()</code>) is used.</p> <p>E.g. a pattern like:</p> - {% highlight scala %} - Pattern.begin("start").where(_.getName().equals("c")) - .followedBy("middle").where(_.getName().equals("a")) - .oneOrMore().consecutive() - .followedBy("end1").where(_.getName().equals("b")); - {% endhighlight %} +{% highlight scala %} +Pattern.begin("start").where(_.getName().equals("c")) + .followedBy("middle").where(_.getName().equals("a")) + .oneOrMore().consecutive() + .followedBy("end1").where(_.getName().equals("b")); +{% endhighlight %} <p>Will generate the following matches for an input sequence: C D A1 A2 A3 D A4 B</p> @@ -615,17 +621,17 @@ pattern.subtype(classOf[SubEvent]) <tr> <td><strong>allowCombinations()</strong><a name="allow_comb_java"></a></td> <td> - <p>Works in conjunction with oneOrMore() and times() and imposes non-deterministic relaxed contiguity - between the matching events (as in followedByAny()).</p> - <p>If not applied a relaxed contiguity (as in followedBy) is used.</p> + <p>Works in conjunction with <code>oneOrMore()</code> and <code>times()</code> and imposes non-deterministic relaxed contiguity + between the matching events (as in <code>followedByAny()</code>).</p> + <p>If not applied a relaxed contiguity (as in <code>followedBy()</code>) is used.</p> <p>E.g. a pattern like:</p> - {% highlight scala %} - Pattern.begin("start").where(_.getName().equals("c")) - .followedBy("middle").where(_.getName().equals("a")) - .oneOrMore().allowCombinations() - .followedBy("end1").where(_.getName().equals("b")); - {% endhighlight %} +{% highlight scala %} +Pattern.begin("start").where(_.getName().equals("c")) + .followedBy("middle").where(_.getName().equals("a")) + .oneOrMore().allowCombinations() + .followedBy("end1").where(_.getName().equals("b")); +{% endhighlight %} <p>Will generate the following matches for an input sequence: C D A1 A2 A3 D A4 B</p> @@ -661,7 +667,7 @@ val start : Pattern[Event, _] = Pattern.begin("start") </div> Next, you can append more patterns to your pattern sequence by specifying the desired *contiguity conditions* between -them. In the [previous paragraph](#conditions-on-contiguity), we described the different contiguity modes supported by +them. In the [previous section](#conditions-on-contiguity) we described the different contiguity modes supported by Flink, namely *strict*, *relaxed*, and *non-deterministic relaxed*, and how to apply them in looping patterns. To apply them between consecutive patterns, you can use: @@ -674,9 +680,9 @@ or 1. `notNext()`, if you do not want an event type to directly follow another 2. `notFollowedBy()`, if you do not want an event type to be anywhere between two other event types -<span class="label label-danger">Attention</span> A pattern sequence cannot end in `notFollowedBy()`. +{% warn Attention %} A pattern sequence cannot end in `notFollowedBy()`. -<span class="label label-danger">Attention</span> A `NOT` pattern cannot be preceded by an optional one. +{% warn Attention %} A `NOT` pattern cannot be preceded by an optional one. <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> @@ -724,7 +730,7 @@ val relaxedNot: Pattern[Event, _] = start.notFollowedBy("not").where(...) Bear in mind that relaxed contiguity means that only the first succeeding matching event will be matched, while with non-deterministic relaxed contiguity, multiple matches will be emitted for the same beginning. As an example, -a pattern `a b`, given the event sequence `"a", "c", "b1", "b2"` will give the following results: +a pattern `a b`, given the event sequence `"a", "c", "b1", "b2"`, will give the following results: 1. Strict Contiguity between `a` and `b`: `{}` (no match) -- the `"c"` after `"a"` causes `"a"` to be discarded. @@ -737,7 +743,7 @@ Finally, it is also possible to define a temporal constraint for the pattern to For example, you can define that a pattern should occur within 10 seconds via the `pattern.within()` method. Temporal patterns are supported for both [processing and event time]({{site.baseurl}}/dev/event_time.html). -<span class="label label-danger">Attention</span> A pattern sequence can only have one temporal constraint. If +{% warn Attention %} A pattern sequence can only have one temporal constraint. If multiple such constraints are defined on different individual patterns, then the smallest one is applied. <div class="codetabs" markdown="1"> @@ -811,9 +817,9 @@ Pattern<Event, ?> followedByAny = start.followedByAny("middle"); <td> <p>Appends a new negative pattern. A matching (negative) event has to directly succeed the previous matching event (strict contiguity) for the partial match to be discarded:</p> - {% highlight java %} - Pattern<Event, ?> notNext = start.notNext("not"); - {% endhighlight %} +{% highlight java %} +Pattern<Event, ?> notNext = start.notNext("not"); +{% endhighlight %} </td> </tr> <tr> @@ -822,9 +828,9 @@ Pattern<Event, ?> followedByAny = start.followedByAny("middle"); <p>Appends a new negative pattern. A partial matching event sequence will be discarded even if other events occur between the matching (negative) event and the previous matching event (relaxed contiguity):</p> - {% highlight java %} - Pattern<Event, ?> notFollowedBy = start.notFllowedBy("not"); - {% endhighlight %} +{% highlight java %} +Pattern<Event, ?> notFollowedBy = start.notFllowedBy("not"); +{% endhighlight %} </td> </tr> <tr> @@ -885,9 +891,9 @@ val followedBy = start.followedBy("middle") <p>Appends a new pattern. Other events can occur between a matching event and the previous matching event, and alternative matches will be presented for every alternative matching event (non-deterministic relaxed contiguity):</p> - {% highlight scala %} - val followedByAny = start.followedByAny("middle"); - {% endhighlight %} +{% highlight scala %} +val followedByAny = start.followedByAny("middle"); +{% endhighlight %} </td> </tr> @@ -896,9 +902,9 @@ val followedBy = start.followedBy("middle") <td> <p>Appends a new negative pattern. A matching (negative) event has to directly succeed the previous matching event (strict contiguity) for the partial match to be discarded:</p> - {% highlight scala %} - val notNext = start.notNext("not") - {% endhighlight %} +{% highlight scala %} +val notNext = start.notNext("not") +{% endhighlight %} </td> </tr> <tr> @@ -907,9 +913,9 @@ val followedBy = start.followedBy("middle") <p>Appends a new negative pattern. A partial matching event sequence will be discarded even if other events occur between the matching (negative) event and the previous matching event (relaxed contiguity):</p> - {% highlight scala %} - val notFollowedBy = start.notFllowedBy("not") - {% endhighlight %} +{% highlight scala %} +val notFollowedBy = start.notFllowedBy("not") +{% endhighlight %} </td> </tr> @@ -923,7 +929,6 @@ pattern.within(Time.seconds(10)) {% endhighlight %} </td> </tr> - <tr> </tbody> </table> </div> @@ -958,7 +963,7 @@ val patternStream: PatternStream[Event] = CEP.pattern(input, pattern) The input stream can be *keyed* or *non-keyed* depending on your use-case. -<span class="label label-danger">Attention</span> Applying your pattern on a non-keyed stream will result is a job with +{% warn Attention %} Applying your pattern on a non-keyed stream will result in a job with parallelism equal to 1. ### Selecting from Patterns @@ -1088,7 +1093,7 @@ DataStream[Either[TimeoutEvent, ComplexEvent]] result = patternStream.select{ {% endhighlight %} The `flatSelect` API call offers the same overloaded version which takes as the first parameter a timeout function and as second parameter a selection function. -In contrast to the `select` functions, the `flatSelect` functions are called with an `Collector`. +In contrast to the `select` functions, the `flatSelect` functions are called with a `Collector`. The collector can be used to emit an arbitrary number of events. {% highlight scala %} @@ -1113,8 +1118,7 @@ when working in event time, an incoming element is initially put in a buffer whe order based on their timestamp*, and when a watermark arrives, all the elements in this buffer with timestamps smaller than that of the watermark are processed. This implies that elements between watermarks are processed in event-time order. -<span class="label label-danger">Attention</span> The library assumes correctness of the watermark when working -in event time. +{% warn Attention %} The library assumes correctness of the watermark when working in event time. To also guarantee that elements across watermarks are processed in event-time order, Flink's CEP library assumes *correctness of the watermark*, and considers as *late* elements whose timestamp is smaller than that of the last
