[
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16119868#comment-16119868
]
ASF GitHub Bot commented on FLINK-7169:
---------------------------------------
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/4331#discussion_r132175418
--- Diff:
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/AfterMatchSkipITCase.java
---
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
+import org.apache.flink.cep.pattern.MalformedPatternException;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * After match skip tests.
+ */
+public class AfterMatchSkipITCase extends
StreamingMultipleProgramsTestBase {
+
+ private String resultPath;
+ private String expected;
+
+ private String lateEventPath;
+ private String expectedLateEvents;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws Exception {
+ resultPath = tempFolder.newFile().toURI().toString();
+ expected = "";
+
+ lateEventPath = tempFolder.newFile().toURI().toString();
+ expectedLateEvents = "";
+ }
+
+ @After
+ public void after() throws Exception {
+ compareResultsByLinesInMemory(expected, resultPath);
+ compareResultsByLinesInMemory(expectedLateEvents,
lateEventPath);
+ }
+
+ private PatternSelectFunction<Event, String> newIdSelectFunction(String
... names) {
+ return new PatternSelectFunction<Event, String>() {
+
+ @Override
+ public String select(Map<String, List<Event>> pattern) {
+ StringBuilder builder = new StringBuilder();
+ for (String name: names) {
+ for (Event e : pattern.get(name)) {
+
builder.append(e.getId()).append(",");
+ }
+ }
+ return builder.toString();
+ }
+ };
+ }
+
+ @Test
+ public void testSkipToNext() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<Event> input = env.fromElements(
+ new Event(1, "a", 0.0),
+ new Event(2, "a", 0.0),
+ new Event(3, "a", 0.0),
+ new Event(4, "a", 0.0),
+ new Event(5, "a", 0.0),
+ new Event(6, "a", 0.0)
+ );
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start",
+ new
AfterMatchSkipStrategy(AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_NEXT_EVENT))
+ .where(new SimpleCondition<Event>() {
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).times(3);
+ DataStream<String> result = CEP.pattern(input,
pattern).select(newIdSelectFunction("start"));
+
+ result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+ // expected sequence of matching event ids
+ expected = "1,2,3,\n2,3,4,\n3,4,5,\n4,5,6,";
+
+ env.execute();
+ }
+
+ @Test
+ public void testSkipPastLast() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<Event> input = env.fromElements(
+ new Event(1, "a", 0.0),
+ new Event(2, "a", 0.0),
+ new Event(3, "a", 0.0),
+ new Event(4, "a", 0.0),
+ new Event(5, "a", 0.0),
+ new Event(6, "a", 0.0)
+ );
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start",
+ new
AfterMatchSkipStrategy(AfterMatchSkipStrategy.SkipStrategy.SKIP_PAST_LAST_EVENT)).where(new
SimpleCondition<Event>() {
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).times(3);
+
+ DataStream<String> result = CEP.pattern(input,
pattern).select(newIdSelectFunction("start"));
+
+ result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+ // expected sequence of matching event ids
+ expected = "1,2,3,\n4,5,6,";
+
+ env.execute();
+ }
+
+ @Test
+ public void testSkipToFirst() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<Event> input = env.fromElements(
+ new Event(1, "ab", 0.0),
+ new Event(2, "ab", 0.0),
+ new Event(3, "ab", 0.0),
+ new Event(4, "ab", 0.0),
+ new Event(5, "ab", 0.0),
+ new Event(6, "ab", 0.0),
+ new Event(7, "ab", 0.0)
+ );
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start",
+ new AfterMatchSkipStrategy(
+
AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST, "end"))
+ .where(new SimpleCondition<Event>() {
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().contains("a");
+ }
+ }).times(2).next("end").where(new SimpleCondition<Event>() {
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().contains("b");
+ }
+ }).times(2);
+
+ DataStream<String> result = CEP.pattern(input,
pattern).select(newIdSelectFunction("start", "end"));
+
+ result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+ // expected sequence of matching event ids
+ expected = "1,2,3,4,\n3,4,5,6,";
+
+ env.execute();
+ }
+
+ @Test
+ public void testSkipToLast() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<Event> input = env.fromElements(
+ new Event(1, "ab", 0.0),
+ new Event(2, "ab", 0.0),
+ new Event(3, "ab", 0.0),
+ new Event(4, "ab", 0.0),
+ new Event(5, "ab", 0.0),
+ new Event(6, "ab", 0.0),
+ new Event(7, "ab", 0.0)
+ );
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start", new
AfterMatchSkipStrategy(
+ AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST,
"end")).where(new SimpleCondition<Event>() {
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().contains("a");
+ }
+ }).times(2).next("end").where(new SimpleCondition<Event>() {
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().contains("b");
+ }
+ }).times(2);
+ DataStream<String> result = CEP.pattern(input,
pattern).select(newIdSelectFunction("start", "end"));
+
+ result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+ // expected sequence of matching event ids
+ expected = "1,2,3,4,\n4,5,6,7,";
+
+ env.execute();
+ }
+
+ @Test(expected = MalformedPatternException.class)
+ public void testSkipToLastWithEmptyException() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<Event> input = env.fromElements(
+ new Event(1, "ab", 0.0),
+ new Event(2, "c", 0.0),
+ new Event(3, "ab", 0.0),
+ new Event(4, "c", 0.0)
+ );
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start", new
AfterMatchSkipStrategy(
+ AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST,
"middle")).where(new SimpleCondition<Event>() {
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().contains("a");
+ }
+ }).next("middle").where(
+ new SimpleCondition<Event>() {
+
+ @Override
+ public boolean filter(Event value) throws
Exception {
+ return value.getName().contains("d");
+ }
+ }
+ ).oneOrMore().optional().next("end").where(new
SimpleCondition<Event>() {
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().contains("c");
+ }
+ });
+ DataStream<String> result = CEP.pattern(input,
pattern).select(newIdSelectFunction("start", "end"));
+
+ result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+ env.execute();
+ }
+
+ @Test(expected = MalformedPatternException.class)
+ public void testSkipToLastWithInfiniteLoopException() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<Event> input = env.fromElements(
+ new Event(1, "ab", 0.0),
+ new Event(2, "c", 0.0),
+ new Event(3, "ab", 0.0),
+ new Event(4, "c", 0.0)
+ );
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start", new
AfterMatchSkipStrategy(
+ AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST,
"start")).where(new SimpleCondition<Event>() {
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().contains("a");
+ }
+ }).next("middle").where(
+ new SimpleCondition<Event>() {
+
+ @Override
+ public boolean filter(Event value) throws
Exception {
+ return value.getName().contains("d");
+ }
+ }
+ ).oneOrMore().optional().next("end").where(new
SimpleCondition<Event>() {
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().contains("c");
+ }
+ });
+ DataStream<String> result = CEP.pattern(input,
pattern).select(newIdSelectFunction("start", "end"));
+
+ result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+ env.execute();
+ }
+
+ @Test(expected = MalformedPatternException.class)
+ public void testSkipToFirstWithInfiniteLoopException() throws Exception
{
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<Event> input = env.fromElements(
+ new Event(1, "ab", 0.0),
+ new Event(2, "c", 0.0),
+ new Event(3, "ab", 0.0),
+ new Event(4, "c", 0.0)
+ );
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start", new
AfterMatchSkipStrategy(
+ AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST,
"middle")).where(new SimpleCondition<Event>() {
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().contains("x");
+ }
+ }).oneOrMore().optional().next("middle").where(
+ new SimpleCondition<Event>() {
+
+ @Override
+ public boolean filter(Event value) throws
Exception {
+ return value.getName().contains("b");
+ }
+ }
+ ).next("end").where(new SimpleCondition<Event>() {
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().contains("c");
+ }
+ });
+
+ DataStream<String> result = CEP.pattern(input,
pattern).select(newIdSelectFunction("middle", "end"));
+
+ result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+ env.execute();
+ }
+
+ @Test
+ public void testSkipPastLast2() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<Event> input = env.fromElements(
+ new Event(1, "a1", 0.0),
+ new Event(2, "a2", 0.0),
+ new Event(3, "b1", 0.0),
+ new Event(4, "b2", 0.0),
+ new Event(5, "c1", 0.0),
+ new Event(6, "c2", 0.0),
+ new Event(7, "d1", 0.0),
+ new Event(8, "d2", 0.0)
+ );
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("a", new
AfterMatchSkipStrategy(
+
AfterMatchSkipStrategy.SkipStrategy.SKIP_PAST_LAST_EVENT)).where(new
SimpleCondition<Event>() {
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().contains("a");
+ }
+ }).followedByAny("b").where(
+ new SimpleCondition<Event>() {
+
+ @Override
+ public boolean filter(Event value) throws
Exception {
+ return value.getName().contains("b");
+ }
+ }
+ ).followedByAny("c").where(new SimpleCondition<Event>() {
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().contains("c");
+ }
+ })
+ .followedByAny("d").where(new SimpleCondition<Event>() {
+ @Override
+ public boolean filter(Event value) throws
Exception {
+ return value.getName().contains("d");
+ }
+ });
+
+ DataStream<String> result = CEP.pattern(input,
pattern).select(newIdSelectFunction("a", "b", "c", "d"));
+
+ result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+ // expected sequence of matching event ids
+ expected =
"1,3,5,7,\n1,3,6,7,\n1,4,5,7,\n1,4,6,7,\n2,3,5,7,\n2,3,6,7,\n2,4,5,7,\n2,4,6,7,";
+
+ env.execute();
+ }
+
+ @Test
+ public void testSkipPastLast3() throws Exception {
--- End diff --
same as above
> Support AFTER MATCH SKIP function in CEP library API
> ----------------------------------------------------
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
> Issue Type: Sub-task
> Components: CEP
> Reporter: Yueting Chen
> Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static <T> PatternStream<T> pattern(DataStream<T> input, Pattern<T, ?>
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy)
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's
> what CEP library behaves currently.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)