Repository: flink Updated Branches: refs/heads/master 7215f9d3e -> 68cb25d5a
[FLINK-7280] Wrong clearing SharedBuffer of Equal elements with same Timestamp This closes #4406 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/68cb25d5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/68cb25d5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/68cb25d5 Branch: refs/heads/master Commit: 68cb25d5a5ebcc22719b7d4fc711a0e32518c877 Parents: 7215f9d Author: Dawid Wysakowicz <dwysakow...@apache.org> Authored: Thu Jul 27 10:45:17 2017 +0200 Committer: Dawid Wysakowicz <dwysakow...@apache.org> Committed: Wed Aug 2 10:17:42 2017 +0200 ---------------------------------------------------------------------- .../main/java/org/apache/flink/cep/nfa/NFA.java | 4 +- .../apache/flink/cep/nfa/SameElementITCase.java | 91 ++++++++++++++++++++ .../flink/cep/operator/CEPOperatorTest.java | 11 ++- 3 files changed, 102 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/68cb25d5/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java index f561be4..2f6f02e 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -289,7 +289,7 @@ public class NFA<T> implements Serializable { newComputationState.getPreviousState().getName()), newComputationState.getEvent(), newComputationState.getTimestamp(), - computationState.getCounter()); + newComputationState.getCounter()); } else if (newComputationState.isStopState()) { //reached stop state. release entry for the stop state shouldDiscardPath = true; @@ -298,7 +298,7 @@ public class NFA<T> implements Serializable { newComputationState.getPreviousState().getName()), newComputationState.getEvent(), newComputationState.getTimestamp(), - computationState.getCounter()); + newComputationState.getCounter()); } else { // add new computation state; it will be processed once the next event arrives statesToRetain.add(newComputationState); http://git-wip-us.apache.org/repos/asf/flink/blob/68cb25d5/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java index d378a74..183cb6d 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java @@ -26,6 +26,7 @@ import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.TestLogger; +import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import org.junit.Test; @@ -34,6 +35,7 @@ import java.util.List; import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps; import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA; +import static org.junit.Assert.assertTrue; /** * Tests for handling Events that are equal in case of {@link Object#equals(Object)} and have same timestamps. @@ -99,6 +101,95 @@ public class SameElementITCase extends TestLogger { )); } +@Test +public void testClearingBuffer() throws Exception { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event a1 = new Event(40, "a", 1.0); + Event b1 = new Event(41, "b", 2.0); + Event c1 = new Event(41, "c", 2.0); + Event d = new Event(41, "d", 2.0); + + inputEvents.add(new StreamRecord<>(a1, 1)); + inputEvents.add(new StreamRecord<>(b1, 2)); + inputEvents.add(new StreamRecord<>(c1, 2)); + inputEvents.add(new StreamRecord<>(d, 2)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("a").where(new SimpleCondition<Event>() { + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).followedBy("b").where(new SimpleCondition<Event>() { + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }).followedBy("c").where(new SimpleCondition<Event>() { + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("d").where(new SimpleCondition<Event>() { + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(a1, b1, c1, d) + )); + assertTrue(nfa.isEmpty()); +} + +@Test +public void testClearingBufferWithUntilAtTheEnd() throws Exception { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event a1 = new Event(40, "a", 1.0); + Event d1 = new Event(41, "d", 2.0); + Event d2 = new Event(41, "d", 2.0); + Event d3 = new Event(41, "d", 2.0); + Event d4 = new Event(41, "d", 2.0); + + inputEvents.add(new StreamRecord<>(a1, 1)); + inputEvents.add(new StreamRecord<>(d1, 2)); + inputEvents.add(new StreamRecord<>(d2, 2)); + inputEvents.add(new StreamRecord<>(d3, 2)); + inputEvents.add(new StreamRecord<>(d4, 4)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("a").where(new SimpleCondition<Event>() { + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).followedBy("d").where(new SimpleCondition<Event>() { + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }).oneOrMore().until(new IterativeCondition<Event>() { + @Override + public boolean filter(Event value, Context<Event> ctx) throws Exception { + return Iterators.size(ctx.getEventsForPattern("d").iterator()) == 3; + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(a1, d1, d2, d3), + Lists.newArrayList(a1, d1, d2), + Lists.newArrayList(a1, d1) + )); + assertTrue(nfa.isEmpty()); +} + @Test public void testZeroOrMoreSameElement() { List<StreamRecord<Event>> inputEvents = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/flink/blob/68cb25d5/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java index b98d241..674365c 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java @@ -44,6 +44,7 @@ import org.apache.flink.types.Either; import org.apache.flink.util.TestLogger; import com.google.common.collect.Lists; +import org.junit.After; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -61,6 +62,7 @@ import java.util.Queue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.validateMockitoUsage; /** * Tests for {@link KeyedCEPPatternOperator} and {@link TimeoutKeyedCEPPatternOperator}. @@ -70,6 +72,11 @@ public class CEPOperatorTest extends TestLogger { @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); + @After + public void validate() { + validateMockitoUsage(); + } + @Test public void testKeyedCEPOperatorWatermarkForwarding() throws Exception { @@ -471,7 +478,7 @@ public class CEPOperatorTest extends TestLogger { harness.processElement(new StreamRecord<>(endEvent, 4L)); // verify the number of invocations NFA is updated - Mockito.verify(nfaOperatorStateSpy, Mockito.times(3)).update(Mockito.any()); + Mockito.verify(nfaOperatorStateSpy, Mockito.times(2)).update(Mockito.any()); // get and verify the output Queue<Object> result = harness.getOutput(); @@ -518,7 +525,7 @@ public class CEPOperatorTest extends TestLogger { harness.processElement(new StreamRecord<>(endEvent, 4L)); // verify the number of invocations NFA is updated - Mockito.verify(nfaOperatorStateSpy, Mockito.times(3)).update(Mockito.any()); + Mockito.verify(nfaOperatorStateSpy, Mockito.times(2)).update(Mockito.any()); // get and verify the output Queue<Object> result = harness.getOutput();