[FLINK-7061] [cep] Fix quantifier range starting from 0 This closes #4242
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3fc96cd1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3fc96cd1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3fc96cd1 Branch: refs/heads/master Commit: 3fc96cd1f9564a60ba5ec7f06a1fec4ab173b200 Parents: 3096bd0 Author: Dian Fu <fudian...@alibaba-inc.com> Authored: Sun Jul 2 13:11:05 2017 +0800 Committer: Dawid Wysakowicz <dwysakow...@apache.org> Committed: Wed Jul 5 11:53:59 2017 +0200 ---------------------------------------------------------------------- .../org/apache/flink/cep/pattern/Pattern.java | 1 + .../apache/flink/cep/pattern/Quantifier.java | 3 +- .../apache/flink/cep/nfa/TimesRangeITCase.java | 51 ++++++++++++++++++++ .../apache/flink/cep/pattern/PatternTest.java | 10 ++++ 4 files changed, 63 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3fc96cd1/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java index f4d3404..2ffbc41 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java @@ -368,6 +368,7 @@ public class Pattern<T, F extends T> { this.quantifier = Quantifier.times(quantifier.getConsumingStrategy()); if (from == 0) { this.quantifier.optional(); + from = 1; } this.times = Times.of(from, to); return this; http://git-wip-us.apache.org/repos/asf/flink/blob/3fc96cd1/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java index c1893b4..9192a13 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java @@ -153,9 +153,8 @@ public class Quantifier { private final int to; private Times(int from, int to) { - Preconditions.checkArgument(from >= 0, "The from should be a non-negative number greater than or equal to 0."); + Preconditions.checkArgument(from > 0, "The from should be a positive number greater than 0."); Preconditions.checkArgument(to >= from, "The to should be a number greater than or equal to from: " + from + "."); - Preconditions.checkArgument(from != to || from != 0, "The from and to should not be both equal to 0."); this.from = from; this.to = to; } http://git-wip-us.apache.org/repos/asf/flink/blob/3fc96cd1/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java index 4305fa2..37a9534 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java @@ -92,6 +92,57 @@ public class TimesRangeITCase extends TestLogger { } @Test + public void testTimesRangeFromZero() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event startEvent = new Event(40, "c", 1.0); + Event middleEvent1 = new Event(41, "a", 2.0); + Event middleEvent2 = new Event(42, "a", 3.0); + Event middleEvent3 = new Event(43, "a", 4.0); + Event end1 = new Event(44, "b", 5.0); + + inputEvents.add(new StreamRecord<>(startEvent, 1)); + inputEvents.add(new StreamRecord<>(middleEvent1, 2)); + inputEvents.add(new StreamRecord<>(middleEvent2, 3)); + inputEvents.add(new StreamRecord<>(middleEvent3, 4)); + inputEvents.add(new StreamRecord<>(end1, 6)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).next("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(0, 2).allowCombinations().followedBy("end1").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1), + Lists.newArrayList(startEvent, middleEvent1, middleEvent3, end1), + Lists.newArrayList(startEvent, middleEvent1, end1), + Lists.newArrayList(startEvent, end1) + )); + } + + @Test public void testTimesRangeNonStrict() { List<StreamRecord<Event>> inputEvents = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/flink/blob/3fc96cd1/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java index 999e5f3..6d93ff3 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java @@ -195,6 +195,16 @@ public class PatternTest extends TestLogger { assertEquals(previous2.getName(), "start"); } + @Test(expected = IllegalArgumentException.class) + public void testPatternTimesNegativeTimes() throws Exception { + Pattern.begin("start").where(dummyCondition()).times(-1); + } + + @Test(expected = IllegalArgumentException.class) + public void testPatternTimesNegativeFrom() throws Exception { + Pattern.begin("start").where(dummyCondition()).times(-1, 2); + } + @Test(expected = MalformedPatternException.class) public void testPatternCanHaveQuantifierSpecifiedOnce1() throws Exception {