[FLINK-7061] [cep] Fix quantifier range starting from 0

This closes #4242


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3fc96cd1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3fc96cd1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3fc96cd1

Branch: refs/heads/master
Commit: 3fc96cd1f9564a60ba5ec7f06a1fec4ab173b200
Parents: 3096bd0
Author: Dian Fu <fudian...@alibaba-inc.com>
Authored: Sun Jul 2 13:11:05 2017 +0800
Committer: Dawid Wysakowicz <dwysakow...@apache.org>
Committed: Wed Jul 5 11:53:59 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/cep/pattern/Pattern.java   |  1 +
 .../apache/flink/cep/pattern/Quantifier.java    |  3 +-
 .../apache/flink/cep/nfa/TimesRangeITCase.java  | 51 ++++++++++++++++++++
 .../apache/flink/cep/pattern/PatternTest.java   | 10 ++++
 4 files changed, 63 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3fc96cd1/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index f4d3404..2ffbc41 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -368,6 +368,7 @@ public class Pattern<T, F extends T> {
                this.quantifier = 
Quantifier.times(quantifier.getConsumingStrategy());
                if (from == 0) {
                        this.quantifier.optional();
+                       from = 1;
                }
                this.times = Times.of(from, to);
                return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/3fc96cd1/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
index c1893b4..9192a13 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
@@ -153,9 +153,8 @@ public class Quantifier {
                private final int to;
 
                private Times(int from, int to) {
-                       Preconditions.checkArgument(from >= 0, "The from should 
be a non-negative number greater than or equal to 0.");
+                       Preconditions.checkArgument(from > 0, "The from should 
be a positive number greater than 0.");
                        Preconditions.checkArgument(to >= from, "The to should 
be a number greater than or equal to from: " + from + ".");
-                       Preconditions.checkArgument(from != to || from != 0, 
"The from and to should not be both equal to 0.");
                        this.from = from;
                        this.to = to;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/3fc96cd1/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
index 4305fa2..37a9534 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
@@ -92,6 +92,57 @@ public class TimesRangeITCase extends TestLogger {
        }
 
        @Test
+       public void testTimesRangeFromZero() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event startEvent = new Event(40, "c", 1.0);
+               Event middleEvent1 = new Event(41, "a", 2.0);
+               Event middleEvent2 = new Event(42, "a", 3.0);
+               Event middleEvent3 = new Event(43, "a", 4.0);
+               Event end1 = new Event(44, "b", 5.0);
+
+               inputEvents.add(new StreamRecord<>(startEvent, 1));
+               inputEvents.add(new StreamRecord<>(middleEvent1, 2));
+               inputEvents.add(new StreamRecord<>(middleEvent2, 3));
+               inputEvents.add(new StreamRecord<>(middleEvent3, 4));
+               inputEvents.add(new StreamRecord<>(end1, 6));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).next("middle").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).times(0, 2).allowCombinations().followedBy("end1").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, end1),
+                       Lists.newArrayList(startEvent, middleEvent1, 
middleEvent3, end1),
+                       Lists.newArrayList(startEvent, middleEvent1, end1),
+                       Lists.newArrayList(startEvent, end1)
+               ));
+       }
+
+       @Test
        public void testTimesRangeNonStrict() {
                List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3fc96cd1/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
index 999e5f3..6d93ff3 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
@@ -195,6 +195,16 @@ public class PatternTest extends TestLogger {
                assertEquals(previous2.getName(), "start");
        }
 
+       @Test(expected = IllegalArgumentException.class)
+       public void testPatternTimesNegativeTimes() throws Exception {
+               Pattern.begin("start").where(dummyCondition()).times(-1);
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testPatternTimesNegativeFrom() throws Exception {
+               Pattern.begin("start").where(dummyCondition()).times(-1, 2);
+       }
+
        @Test(expected = MalformedPatternException.class)
        public void testPatternCanHaveQuantifierSpecifiedOnce1() throws 
Exception {
 

Reply via email to