Repository: flink
Updated Branches:
  refs/heads/master 7215f9d3e -> 68cb25d5a


[FLINK-7280] Wrong clearing SharedBuffer of Equal elements with same Timestamp

This closes #4406


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/68cb25d5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/68cb25d5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/68cb25d5

Branch: refs/heads/master
Commit: 68cb25d5a5ebcc22719b7d4fc711a0e32518c877
Parents: 7215f9d
Author: Dawid Wysakowicz <dwysakow...@apache.org>
Authored: Thu Jul 27 10:45:17 2017 +0200
Committer: Dawid Wysakowicz <dwysakow...@apache.org>
Committed: Wed Aug 2 10:17:42 2017 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/flink/cep/nfa/NFA.java |  4 +-
 .../apache/flink/cep/nfa/SameElementITCase.java | 91 ++++++++++++++++++++
 .../flink/cep/operator/CEPOperatorTest.java     | 11 ++-
 3 files changed, 102 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/68cb25d5/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index f561be4..2f6f02e 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -289,7 +289,7 @@ public class NFA<T> implements Serializable {
                                                                        
newComputationState.getPreviousState().getName()),
                                                        
newComputationState.getEvent(),
                                                        
newComputationState.getTimestamp(),
-                                                       
computationState.getCounter());
+                                                       
newComputationState.getCounter());
                                } else if (newComputationState.isStopState()) {
                                        //reached stop state. release entry for 
the stop state
                                        shouldDiscardPath = true;
@@ -298,7 +298,7 @@ public class NFA<T> implements Serializable {
                                                                        
newComputationState.getPreviousState().getName()),
                                                        
newComputationState.getEvent(),
                                                        
newComputationState.getTimestamp(),
-                                                       
computationState.getCounter());
+                                                       
newComputationState.getCounter());
                                } else {
                                        // add new computation state; it will 
be processed once the next event arrives
                                        statesToRetain.add(newComputationState);

http://git-wip-us.apache.org/repos/asf/flink/blob/68cb25d5/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
index d378a74..183cb6d 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
 
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import org.junit.Test;
 
@@ -34,6 +35,7 @@ import java.util.List;
 
 import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
 import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests for handling Events that are equal in case of {@link 
Object#equals(Object)} and have same timestamps.
@@ -99,6 +101,95 @@ public class SameElementITCase extends TestLogger {
                ));
        }
 
+@Test
+public void testClearingBuffer() throws Exception {
+       List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+       Event a1 = new Event(40, "a", 1.0);
+       Event b1 = new Event(41, "b", 2.0);
+       Event c1 = new Event(41, "c", 2.0);
+       Event d = new Event(41, "d", 2.0);
+
+       inputEvents.add(new StreamRecord<>(a1, 1));
+       inputEvents.add(new StreamRecord<>(b1, 2));
+       inputEvents.add(new StreamRecord<>(c1, 2));
+       inputEvents.add(new StreamRecord<>(d, 2));
+
+       Pattern<Event, ?> pattern = Pattern.<Event>begin("a").where(new 
SimpleCondition<Event>() {
+               @Override
+               public boolean filter(Event value) throws Exception {
+                       return value.getName().equals("a");
+               }
+       }).followedBy("b").where(new SimpleCondition<Event>() {
+               @Override
+               public boolean filter(Event value) throws Exception {
+                       return value.getName().equals("b");
+               }
+       }).followedBy("c").where(new SimpleCondition<Event>() {
+               @Override
+               public boolean filter(Event value) throws Exception {
+                       return value.getName().equals("c");
+               }
+       }).followedBy("d").where(new SimpleCondition<Event>() {
+               @Override
+               public boolean filter(Event value) throws Exception {
+                       return value.getName().equals("d");
+               }
+       });
+
+       NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+       List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+       compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+               Lists.newArrayList(a1, b1, c1, d)
+       ));
+       assertTrue(nfa.isEmpty());
+}
+
+@Test
+public void testClearingBufferWithUntilAtTheEnd() throws Exception {
+       List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+       Event a1 = new Event(40, "a", 1.0);
+       Event d1 = new Event(41, "d", 2.0);
+       Event d2 = new Event(41, "d", 2.0);
+       Event d3 = new Event(41, "d", 2.0);
+       Event d4 = new Event(41, "d", 2.0);
+
+       inputEvents.add(new StreamRecord<>(a1, 1));
+       inputEvents.add(new StreamRecord<>(d1, 2));
+       inputEvents.add(new StreamRecord<>(d2, 2));
+       inputEvents.add(new StreamRecord<>(d3, 2));
+       inputEvents.add(new StreamRecord<>(d4, 4));
+
+       Pattern<Event, ?> pattern = Pattern.<Event>begin("a").where(new 
SimpleCondition<Event>() {
+               @Override
+               public boolean filter(Event value) throws Exception {
+                       return value.getName().equals("a");
+               }
+       }).followedBy("d").where(new SimpleCondition<Event>() {
+               @Override
+               public boolean filter(Event value) throws Exception {
+                       return value.getName().equals("d");
+               }
+       }).oneOrMore().until(new IterativeCondition<Event>() {
+               @Override
+               public boolean filter(Event value, Context<Event> ctx) throws 
Exception {
+                       return 
Iterators.size(ctx.getEventsForPattern("d").iterator()) == 3;
+               }
+       });
+
+       NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+       List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+       compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+               Lists.newArrayList(a1, d1, d2, d3),
+               Lists.newArrayList(a1, d1, d2),
+               Lists.newArrayList(a1, d1)
+       ));
+       assertTrue(nfa.isEmpty());
+}
+
        @Test
        public void testZeroOrMoreSameElement() {
                List<StreamRecord<Event>> inputEvents = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/68cb25d5/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index b98d241..674365c 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -44,6 +44,7 @@ import org.apache.flink.types.Either;
 import org.apache.flink.util.TestLogger;
 
 import com.google.common.collect.Lists;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -61,6 +62,7 @@ import java.util.Queue;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.validateMockitoUsage;
 
 /**
  * Tests for {@link KeyedCEPPatternOperator} and {@link 
TimeoutKeyedCEPPatternOperator}.
@@ -70,6 +72,11 @@ public class CEPOperatorTest extends TestLogger {
        @Rule
        public TemporaryFolder tempFolder = new TemporaryFolder();
 
+       @After
+       public void validate() {
+               validateMockitoUsage();
+       }
+
        @Test
        public void testKeyedCEPOperatorWatermarkForwarding() throws Exception {
 
@@ -471,7 +478,7 @@ public class CEPOperatorTest extends TestLogger {
                        harness.processElement(new StreamRecord<>(endEvent, 
4L));
 
                        // verify the number of invocations NFA is updated
-                       Mockito.verify(nfaOperatorStateSpy, 
Mockito.times(3)).update(Mockito.any());
+                       Mockito.verify(nfaOperatorStateSpy, 
Mockito.times(2)).update(Mockito.any());
 
                        // get and verify the output
                        Queue<Object> result = harness.getOutput();
@@ -518,7 +525,7 @@ public class CEPOperatorTest extends TestLogger {
                        harness.processElement(new StreamRecord<>(endEvent, 
4L));
 
                        // verify the number of invocations NFA is updated
-                       Mockito.verify(nfaOperatorStateSpy, 
Mockito.times(3)).update(Mockito.any());
+                       Mockito.verify(nfaOperatorStateSpy, 
Mockito.times(2)).update(Mockito.any());
 
                        // get and verify the output
                        Queue<Object> result = harness.getOutput();

Reply via email to