Repository: flink
Updated Branches:
  refs/heads/master 9244106b3 -> 00ce3f1b1


http://git-wip-us.apache.org/repos/asf/flink/blob/ae9c9d06/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 2cc67e5..46e2fd4 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.cep.nfa;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 import com.google.common.primitives.Doubles;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.Event;
@@ -156,22 +155,11 @@ public class NFAITCase extends TestLogger {
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-               List<Map<String, Event>> resultingPatterns = new ArrayList<>();
-
-               for (StreamRecord<Event> inputEvent: inputEvents) {
-                       Collection<Map<String, Event>> patterns = nfa.process(
-                               inputEvent.getValue(),
-                               inputEvent.getTimestamp()).f0;
-
-                       resultingPatterns.addAll(patterns);
-               }
-
-               assertEquals(1, resultingPatterns.size());
-               Map<String, Event> patternMap = resultingPatterns.get(0);
+               List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
-               assertEquals(startEvent, patternMap.get("start"));
-               assertEquals(middleEvent, patternMap.get("middle"));
-               assertEquals(endEvent, patternMap.get("end"));
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                               Lists.newArrayList(startEvent, middleEvent, 
endEvent)
+               ));
        }
 
        @Test
@@ -202,24 +190,11 @@ public class NFAITCase extends TestLogger {
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-               Set<Set<Event>> resultingPatterns = new HashSet<>();
-               List<Collection<Event>> allPatterns = new ArrayList<>();
-
-               for (StreamRecord<Event> inputEvent : inputEvents) {
-                       Collection<Map<String, Event>> patterns = nfa.process(
-                               inputEvent.getValue(),
-                               inputEvent.getTimestamp()).f0;
-
-                       for (Map<String, Event> foundPattern : patterns) {
-                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
-                               allPatterns.add(foundPattern.values());
-                       }
-               }
+               List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
-               assertEquals(1, allPatterns.size());
-               assertEquals(Sets.<Set<Event>>newHashSet(
-                       Sets.newHashSet(middleEvent1, end)
-               ), resultingPatterns);
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                               Lists.newArrayList(middleEvent1, end)
+               ));
        }
 
        @Test
@@ -252,19 +227,9 @@ public class NFAITCase extends TestLogger {
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-               Set<Set<Event>> resultingPatterns = new HashSet<>();
-
-               for (StreamRecord<Event> inputEvent : inputEvents) {
-                       Collection<Map<String, Event>> patterns = nfa.process(
-                               inputEvent.getValue(),
-                               inputEvent.getTimestamp()).f0;
-
-                       for (Map<String, Event> foundPattern : patterns) {
-                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
-                       }
-               }
+               List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
-               assertEquals(Sets.newHashSet(), resultingPatterns);
+               compareMaps(resultingPatterns, 
Lists.<List<Event>>newArrayList());
        }
 
        /**
@@ -274,7 +239,6 @@ public class NFAITCase extends TestLogger {
        @Test
        public void testSimplePatternWithTimeWindowNFA() {
                List<StreamRecord<Event>> events = new ArrayList<>();
-               List<Map<String, Event>> resultingPatterns = new ArrayList<>();
 
                final Event startEvent;
                final Event middleEvent;
@@ -313,21 +277,11 @@ public class NFAITCase extends TestLogger {
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-               for (StreamRecord<Event> event: events) {
-                       Collection<Map<String, Event>> patterns = nfa.process(
-                                       event.getValue(),
-                                       event.getTimestamp()).f0;
-
-                       resultingPatterns.addAll(patterns);
-               }
-
-               assertEquals(1, resultingPatterns.size());
-
-               Map<String, Event> patternMap = resultingPatterns.get(0);
+               List<List<Event>> resultingPatterns = feedNFA(events, nfa);
 
-               assertEquals(startEvent, patternMap.get("start"));
-               assertEquals(middleEvent, patternMap.get("middle"));
-               assertEquals(endEvent, patternMap.get("end"));
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                               Lists.newArrayList(startEvent, middleEvent, 
endEvent)
+               ));
        }
 
        /**
@@ -337,9 +291,9 @@ public class NFAITCase extends TestLogger {
        @Test
        public void testSimplePatternWithTimeoutHandling() {
                List<StreamRecord<Event>> events = new ArrayList<>();
-               List<Map<String, Event>> resultingPatterns = new ArrayList<>();
-               Set<Tuple2<Map<String, Event>, Long>> resultingTimeoutPatterns 
= new HashSet<>();
-               Set<Tuple2<Map<String, Event>, Long>> expectedTimeoutPatterns = 
new HashSet<>();
+               List<Map<String, List<Event>>> resultingPatterns = new 
ArrayList<>();
+               Set<Tuple2<Map<String, List<Event>>, Long>> 
resultingTimeoutPatterns = new HashSet<>();
+               Set<Tuple2<Map<String, List<Event>>, Long>> 
expectedTimeoutPatterns = new HashSet<>();
 
                events.add(new StreamRecord<>(new Event(1, "start", 1.0), 1));
                events.add(new StreamRecord<>(new Event(2, "start", 1.0), 2));
@@ -348,19 +302,19 @@ public class NFAITCase extends TestLogger {
                events.add(new StreamRecord<>(new Event(5, "end", 1.0), 11));
                events.add(new StreamRecord<>(new Event(6, "end", 1.0), 13));
 
-               Map<String, Event> timeoutPattern1 = new HashMap<>();
-               timeoutPattern1.put("start", new Event(1, "start", 1.0));
-               timeoutPattern1.put("middle", new Event(3, "middle", 1.0));
+               Map<String, List<Event>> timeoutPattern1 = new HashMap<>();
+               timeoutPattern1.put("start", Collections.singletonList(new 
Event(1, "start", 1.0)));
+               timeoutPattern1.put("middle", Collections.singletonList(new 
Event(3, "middle", 1.0)));
 
-               Map<String, Event> timeoutPattern2 = new HashMap<>();
-               timeoutPattern2.put("start", new Event(2, "start", 1.0));
-               timeoutPattern2.put("middle", new Event(3, "middle", 1.0));
+               Map<String, List<Event>> timeoutPattern2 = new HashMap<>();
+               timeoutPattern2.put("start", Collections.singletonList(new 
Event(2, "start", 1.0)));
+               timeoutPattern2.put("middle", Collections.singletonList(new 
Event(3, "middle", 1.0)));
 
-               Map<String, Event> timeoutPattern3 = new HashMap<>();
-               timeoutPattern3.put("start", new Event(1, "start", 1.0));
+               Map<String, List<Event>> timeoutPattern3 = new HashMap<>();
+               timeoutPattern3.put("start", Collections.singletonList(new 
Event(1, "start", 1.0)));
 
-               Map<String, Event> timeoutPattern4 = new HashMap<>();
-               timeoutPattern4.put("start", new Event(2, "start", 1.0));
+               Map<String, List<Event>> timeoutPattern4 = new HashMap<>();
+               timeoutPattern4.put("start", Collections.singletonList(new 
Event(2, "start", 1.0)));
 
                expectedTimeoutPatterns.add(Tuple2.of(timeoutPattern1, 11L));
                expectedTimeoutPatterns.add(Tuple2.of(timeoutPattern2, 13L));
@@ -393,10 +347,11 @@ public class NFAITCase extends TestLogger {
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), true);
 
                for (StreamRecord<Event> event: events) {
-                       final Tuple2<Collection<Map<String, Event>>, 
Collection<Tuple2<Map<String, Event>, Long>>> patterns = 
nfa.process(event.getValue(), event.getTimestamp());
+                       Tuple2<Collection<Map<String, List<Event>>>, 
Collection<Tuple2<Map<String, List<Event>>, Long>>> patterns =
+                                       nfa.process(event.getValue(), 
event.getTimestamp());
 
-                       Collection<Map<String, Event>> matchedPatterns = 
patterns.f0;
-                       Collection<Tuple2<Map<String, Event>, Long>> 
timeoutPatterns = patterns.f1;
+                       Collection<Map<String, List<Event>>> matchedPatterns = 
patterns.f0;
+                       Collection<Tuple2<Map<String, List<Event>>, Long>> 
timeoutPatterns = patterns.f1;
 
                        resultingPatterns.addAll(matchedPatterns);
                        resultingTimeoutPatterns.addAll(timeoutPatterns);
@@ -460,31 +415,16 @@ public class NFAITCase extends TestLogger {
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-               List<Map<String, Event>> resultingPatterns = new ArrayList<>();
-
-               for (StreamRecord<Event> inputEvent: inputEvents) {
-                       Collection<Map<String, Event>> patterns = nfa.process(
-                               inputEvent.getValue(),
-                               inputEvent.getTimestamp()).f0;
-
-                       resultingPatterns.addAll(patterns);
-               }
-
-               assertEquals(6, resultingPatterns.size());
-
-               final Set<Set<Event>> patterns = new HashSet<>();
-               for (Map<String, Event> resultingPattern : resultingPatterns) {
-                       patterns.add(new HashSet<>(resultingPattern.values()));
-               }
+               List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
-               assertEquals(Sets.newHashSet(
-                       Sets.newHashSet(startEvent, middleEvent1, nextOne1, 
endEvent),
-                       Sets.newHashSet(startEvent, middleEvent2, nextOne1, 
endEvent),
-                       Sets.newHashSet(startEvent, middleEvent3, nextOne1, 
endEvent),
-                       Sets.newHashSet(startEvent, middleEvent1, nextOne2, 
endEvent),
-                       Sets.newHashSet(startEvent, middleEvent2, nextOne2, 
endEvent),
-                       Sets.newHashSet(startEvent, middleEvent3, nextOne2, 
endEvent)
-               ), patterns);
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                               Lists.newArrayList(startEvent, middleEvent1, 
nextOne1, endEvent),
+                               Lists.newArrayList(startEvent, middleEvent2, 
nextOne1, endEvent),
+                               Lists.newArrayList(startEvent, middleEvent3, 
nextOne1, endEvent),
+                               Lists.newArrayList(startEvent, middleEvent1, 
nextOne2, endEvent),
+                               Lists.newArrayList(startEvent, middleEvent2, 
nextOne2, endEvent),
+                               Lists.newArrayList(startEvent, middleEvent3, 
nextOne2, endEvent)
+               ));
        }
 
        @Test
@@ -548,39 +488,26 @@ public class NFAITCase extends TestLogger {
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-               Set<Set<Event>> resultingPatterns = new HashSet<>();
-               List<Collection<Event>> allPatterns = new ArrayList<>();
-
-               for (StreamRecord<Event> inputEvent : inputEvents) {
-                       Collection<Map<String, Event>> patterns = nfa.process(
-                               inputEvent.getValue(),
-                               inputEvent.getTimestamp()).f0;
-
-                       for (Map<String, Event> foundPattern : patterns) {
-                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
-                               allPatterns.add(foundPattern.values());
-                       }
-               }
+               List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
-               assertEquals(16, allPatterns.size());
-               assertEquals(Sets.newHashSet(
-                       Sets.newHashSet(startEvent, middleEvent1, middleEvent2, 
middleEvent3, end1, end2, end4),
-                       Sets.newHashSet(startEvent, middleEvent1, middleEvent2, 
end1, end2, end4),
-                       Sets.newHashSet(startEvent, middleEvent1, middleEvent3, 
end1, end2, end4),
-                       Sets.newHashSet(startEvent, middleEvent2, middleEvent3, 
end1, end2, end4),
-                       Sets.newHashSet(startEvent, middleEvent1, end1, end2, 
end4),
-                       Sets.newHashSet(startEvent, middleEvent2, end1, end2, 
end4),
-                       Sets.newHashSet(startEvent, middleEvent3, end1, end2, 
end4),
-                       Sets.newHashSet(startEvent, end1, end2, end4),
-                       Sets.newHashSet(startEvent, middleEvent1, middleEvent2, 
middleEvent3, end1, end3, end4),
-                       Sets.newHashSet(startEvent, middleEvent1, middleEvent2, 
end1, end3, end4),
-                       Sets.newHashSet(startEvent, middleEvent1, middleEvent3, 
end1, end3, end4),
-                       Sets.newHashSet(startEvent, middleEvent2, middleEvent3, 
end1, end3, end4),
-                       Sets.newHashSet(startEvent, middleEvent1, end1, end3, 
end4),
-                       Sets.newHashSet(startEvent, middleEvent2, end1, end3, 
end4),
-                       Sets.newHashSet(startEvent, middleEvent3, end1, end3, 
end4),
-                       Sets.newHashSet(startEvent, end1, end3, end4)
-               ), resultingPatterns);
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                               Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, middleEvent3, end1, end2, end4),
+                               Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, end1, end2, end4),
+                               Lists.newArrayList(startEvent, middleEvent1, 
middleEvent3, end1, end2, end4),
+                               Lists.newArrayList(startEvent, middleEvent2, 
middleEvent3, end1, end2, end4),
+                               Lists.newArrayList(startEvent, middleEvent1, 
end1, end2, end4),
+                               Lists.newArrayList(startEvent, middleEvent2, 
end1, end2, end4),
+                               Lists.newArrayList(startEvent, middleEvent3, 
end1, end2, end4),
+                               Lists.newArrayList(startEvent, end1, end2, 
end4),
+                               Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, middleEvent3, end1, end3, end4),
+                               Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, end1, end3, end4),
+                               Lists.newArrayList(startEvent, middleEvent1, 
middleEvent3, end1, end3, end4),
+                               Lists.newArrayList(startEvent, middleEvent2, 
middleEvent3, end1, end3, end4),
+                               Lists.newArrayList(startEvent, middleEvent1, 
end1, end3, end4),
+                               Lists.newArrayList(startEvent, middleEvent2, 
end1, end3, end4),
+                               Lists.newArrayList(startEvent, middleEvent3, 
end1, end3, end4),
+                               Lists.newArrayList(startEvent, end1, end3, end4)
+               ));
        }
 
        @Test
@@ -674,27 +601,14 @@ public class NFAITCase extends TestLogger {
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-               Set<Set<Event>> resultingPatterns = new HashSet<>();
-               List<Collection<Event>> allPatterns = new ArrayList<>();
-
-               for (StreamRecord<Event> inputEvent : inputEvents) {
-                       Collection<Map<String, Event>> patterns = nfa.process(
-                               inputEvent.getValue(),
-                               inputEvent.getTimestamp()).f0;
-
-                       for (Map<String, Event> foundPattern : patterns) {
-                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
-                               allPatterns.add(foundPattern.values());
-                       }
-               }
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
 
-               assertEquals(4, allPatterns.size());
-               assertEquals(Sets.newHashSet(
-                       Sets.newHashSet(startEvent, middleEvent1, middleEvent2, 
middleEvent3, end1),
-                       Sets.newHashSet(startEvent, middleEvent1, middleEvent2, 
end1),
-                       Sets.newHashSet(startEvent, middleEvent1, end1),
-                       Sets.newHashSet(startEvent, end1)
-               ), resultingPatterns);
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                               Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, middleEvent3, end1),
+                               Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, end1),
+                               Lists.newArrayList(startEvent, middleEvent1, 
end1),
+                               Lists.newArrayList(startEvent, end1)
+               ));
        }
 
        @Test
@@ -729,30 +643,17 @@ public class NFAITCase extends TestLogger {
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-               Set<Set<Event>> resultingPatterns = new HashSet<>();
-               List<Collection<Event>> allPatterns = new ArrayList<>();
-
-               for (StreamRecord<Event> inputEvent : inputEvents) {
-                       Collection<Map<String, Event>> patterns = nfa.process(
-                               inputEvent.getValue(),
-                               inputEvent.getTimestamp()).f0;
-
-                       for (Map<String, Event> foundPattern : patterns) {
-                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
-                               allPatterns.add(foundPattern.values());
-                       }
-               }
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
 
-               assertEquals(7, allPatterns.size());
-               assertEquals(Sets.newHashSet(
-                       Sets.newHashSet(middleEvent1, middleEvent2, 
middleEvent3, end),
-                       Sets.newHashSet(middleEvent1, middleEvent2, end),
-                       Sets.newHashSet(middleEvent2, middleEvent3, end),
-                       Sets.newHashSet(middleEvent1, end),
-                       Sets.newHashSet(middleEvent2, end),
-                       Sets.newHashSet(middleEvent3, end),
-                       Sets.newHashSet(end)
-               ), resultingPatterns);
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                               Lists.newArrayList(middleEvent1, middleEvent2, 
middleEvent3, end),
+                               Lists.newArrayList(middleEvent1, middleEvent2, 
end),
+                               Lists.newArrayList(middleEvent2, middleEvent3, 
end),
+                               Lists.newArrayList(middleEvent1, end),
+                               Lists.newArrayList(middleEvent2, end),
+                               Lists.newArrayList(middleEvent3, end),
+                               Lists.newArrayList(end)
+               ));
        }
 
        @Test
@@ -805,29 +706,16 @@ public class NFAITCase extends TestLogger {
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-               Set<Set<Event>> resultingPatterns = new HashSet<>();
-               List<Collection<Event>> allPatterns = new ArrayList<>();
-
-               for (StreamRecord<Event> inputEvent : inputEvents) {
-                       Collection<Map<String, Event>> patterns = nfa.process(
-                               inputEvent.getValue(),
-                               inputEvent.getTimestamp()).f0;
-
-                       for (Map<String, Event> foundPattern : patterns) {
-                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
-                               allPatterns.add(foundPattern.values());
-                       }
-               }
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
 
-               assertEquals(6, allPatterns.size());
-               assertEquals(Sets.newHashSet(
-                       Sets.newHashSet(startEvent, middleEvent1, middleEvent2, 
middleEvent3, end),
-                       Sets.newHashSet(startEvent, middleEvent1, middleEvent2, 
end),
-                       Sets.newHashSet(startEvent, middleEvent2, middleEvent3, 
end),
-                       Sets.newHashSet(startEvent, middleEvent2, end),
-                       Sets.newHashSet(startEvent, middleEvent1, end),
-                       Sets.newHashSet(startEvent, end)
-               ), resultingPatterns);
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                               Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, middleEvent3, end),
+                               Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, end),
+                               Lists.newArrayList(startEvent, middleEvent2, 
middleEvent3, end),
+                               Lists.newArrayList(startEvent, middleEvent2, 
end),
+                               Lists.newArrayList(startEvent, middleEvent1, 
end),
+                               Lists.newArrayList(startEvent, end)
+               ));
        }
 
        @Test
@@ -889,31 +777,18 @@ public class NFAITCase extends TestLogger {
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-               Set<Set<Event>> resultingPatterns = new HashSet<>();
-               List<Collection<Event>> allPatterns = new ArrayList<>();
-
-               for (StreamRecord<Event> inputEvent : inputEvents) {
-                       Collection<Map<String, Event>> patterns = nfa.process(
-                               inputEvent.getValue(),
-                               inputEvent.getTimestamp()).f0;
-
-                       for (Map<String, Event> foundPattern : patterns) {
-                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
-                               allPatterns.add(foundPattern.values());
-                       }
-               }
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
 
-               assertEquals(8, allPatterns.size());
-               assertEquals(Sets.newHashSet(
-                       Sets.newHashSet(startEvent, middleEvent1, merging, end),
-                       Sets.newHashSet(startEvent, middleEvent1, merging, 
kleene1, end),
-                       Sets.newHashSet(startEvent, middleEvent1, merging, 
kleene2, end),
-                       Sets.newHashSet(startEvent, middleEvent1, merging, 
kleene1, kleene2, end),
-                       Sets.newHashSet(startEvent, middleEvent2, merging, end),
-                       Sets.newHashSet(startEvent, middleEvent2, merging, 
kleene1, end),
-                       Sets.newHashSet(startEvent, middleEvent2, merging, 
kleene2, end),
-                       Sets.newHashSet(startEvent, middleEvent2, merging, 
kleene1, kleene2, end)
-               ), resultingPatterns);
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                               Lists.newArrayList(startEvent, middleEvent1, 
merging, end),
+                               Lists.newArrayList(startEvent, middleEvent1, 
merging, kleene1, end),
+                               Lists.newArrayList(startEvent, middleEvent1, 
merging, kleene2, end),
+                               Lists.newArrayList(startEvent, middleEvent1, 
merging, kleene1, kleene2, end),
+                               Lists.newArrayList(startEvent, middleEvent2, 
merging, end),
+                               Lists.newArrayList(startEvent, middleEvent2, 
merging, kleene1, end),
+                               Lists.newArrayList(startEvent, middleEvent2, 
merging, kleene2, end),
+                               Lists.newArrayList(startEvent, middleEvent2, 
merging, kleene1, kleene2, end)
+               ));
        }
 
        @Test
@@ -958,19 +833,9 @@ public class NFAITCase extends TestLogger {
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-               Set<Set<Event>> resultingPatterns = new HashSet<>();
-
-               for (StreamRecord<Event> inputEvent : inputEvents) {
-                       Collection<Map<String, Event>> patterns = nfa.process(
-                               inputEvent.getValue(),
-                               inputEvent.getTimestamp()).f0;
-
-                       for (Map<String, Event> foundPattern : patterns) {
-                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
-                       }
-               }
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
 
-               assertEquals(Sets.newHashSet(), resultingPatterns);
+               compareMaps(resultingPatterns, 
Lists.<List<Event>>newArrayList());
        }
 
        @Test
@@ -1059,26 +924,13 @@ public class NFAITCase extends TestLogger {
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-               Set<Set<Event>> resultingPatterns = new HashSet<>();
-               List<Collection<Event>> allPatterns = new ArrayList<>();
-
-               for (StreamRecord<Event> inputEvent : inputEvents) {
-                       Collection<Map<String, Event>> patterns = nfa.process(
-                               inputEvent.getValue(),
-                               inputEvent.getTimestamp()).f0;
-
-                       for (Map<String, Event> foundPattern : patterns) {
-                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
-                               allPatterns.add(foundPattern.values());
-                       }
-               }
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
 
-               assertEquals(3, allPatterns.size());
-               assertEquals(Sets.newHashSet(
-                       Sets.newHashSet(startEvent, middleEvent1, middleEvent2, 
end1),
-                       Sets.newHashSet(startEvent, middleEvent1, end1),
-                       Sets.newHashSet(startEvent, middleEvent2, end1)
-               ), resultingPatterns);
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                               Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, end1),
+                               Lists.newArrayList(startEvent, middleEvent1, 
end1),
+                               Lists.newArrayList(startEvent, middleEvent2, 
end1)
+               ));
        }
 
        @Test
@@ -1113,30 +965,17 @@ public class NFAITCase extends TestLogger {
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-               Set<Set<Event>> resultingPatterns = new HashSet<>();
-               List<Collection<Event>> allPatterns = new ArrayList<>();
-
-               for (StreamRecord<Event> inputEvent : inputEvents) {
-                       Collection<Map<String, Event>> patterns = nfa.process(
-                               inputEvent.getValue(),
-                               inputEvent.getTimestamp()).f0;
-
-                       for (Map<String, Event> foundPattern : patterns) {
-                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
-                               allPatterns.add(foundPattern.values());
-                       }
-               }
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
 
-               assertEquals(7, allPatterns.size());
-               assertEquals(Sets.newHashSet(
-                       Sets.newHashSet(startEvent1, startEvent2, startEvent3, 
end1),
-                       Sets.newHashSet(startEvent1, startEvent2, end1),
-                       Sets.newHashSet(startEvent1, startEvent3, end1),
-                       Sets.newHashSet(startEvent2, startEvent3, end1),
-                       Sets.newHashSet(startEvent1, end1),
-                       Sets.newHashSet(startEvent2, end1),
-                       Sets.newHashSet(startEvent3, end1)
-               ), resultingPatterns);
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                               Lists.newArrayList(startEvent1, startEvent2, 
startEvent3, end1),
+                               Lists.newArrayList(startEvent1, startEvent2, 
end1),
+                               Lists.newArrayList(startEvent1, startEvent3, 
end1),
+                               Lists.newArrayList(startEvent2, startEvent3, 
end1),
+                               Lists.newArrayList(startEvent1, end1),
+                               Lists.newArrayList(startEvent2, end1),
+                               Lists.newArrayList(startEvent3, end1)
+               ));
        }
 
        @Test
@@ -1181,24 +1020,11 @@ public class NFAITCase extends TestLogger {
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-               Set<Set<Event>> resultingPatterns = new HashSet<>();
-               List<Collection<Event>> allPatterns = new ArrayList<>();
-
-               for (StreamRecord<Event> inputEvent : inputEvents) {
-                       Collection<Map<String, Event>> patterns = nfa.process(
-                               inputEvent.getValue(),
-                               inputEvent.getTimestamp()).f0;
-
-                       for (Map<String, Event> foundPattern : patterns) {
-                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
-                               allPatterns.add(foundPattern.values());
-                       }
-               }
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
 
-               assertEquals(1, allPatterns.size());
-               assertEquals(Sets.<Set<Event>>newHashSet(
-                       Sets.newHashSet(startEvent, endEvent)
-               ), resultingPatterns);
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                               Lists.newArrayList(startEvent, endEvent)
+               ));
        }
 
        @Test
@@ -1291,25 +1117,12 @@ public class NFAITCase extends TestLogger {
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-               Set<Set<Event>> resultingPatterns = new HashSet<>();
-               List<Collection<Event>> allPatterns = new ArrayList<>();
-
-               for (StreamRecord<Event> inputEvent : inputEvents) {
-                       Collection<Map<String, Event>> patterns = nfa.process(
-                               inputEvent.getValue(),
-                               inputEvent.getTimestamp()).f0;
-
-                       for (Map<String, Event> foundPattern : patterns) {
-                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
-                               allPatterns.add(foundPattern.values());
-                       }
-               }
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
 
-               assertEquals(2, allPatterns.size());
-               assertEquals(Sets.newHashSet(
-                       Sets.newHashSet(startEvent, middleEvent, end1),
-                       Sets.newHashSet(startEvent, end1)
-               ), resultingPatterns);
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                               Lists.newArrayList(startEvent, middleEvent, 
end1),
+                               Lists.newArrayList(startEvent, end1)
+               ));
        }
 
        @Test
@@ -1602,25 +1415,12 @@ public class NFAITCase extends TestLogger {
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-               Set<Set<Event>> resultingPatterns = new HashSet<>();
-               List<Collection<Event>> allPatterns = new ArrayList<>();
-
-               for (StreamRecord<Event> inputEvent : inputEvents) {
-                       Collection<Map<String, Event>> patterns = nfa.process(
-                               inputEvent.getValue(),
-                               inputEvent.getTimestamp()).f0;
-
-                       for (Map<String, Event> foundPattern : patterns) {
-                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
-                               allPatterns.add(foundPattern.values());
-                       }
-               }
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
 
-               assertEquals(2, allPatterns.size());
-               assertEquals(Sets.newHashSet(
-                       Sets.newHashSet(startEvent,  end1),
-                       Sets.newHashSet(end1)
-               ), resultingPatterns);
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                               Lists.newArrayList(startEvent,  end1),
+                               Lists.newArrayList(end1)
+               ));
        }
 
        @Test
@@ -1655,27 +1455,14 @@ public class NFAITCase extends TestLogger {
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-               Set<Set<Event>> resultingPatterns = new HashSet<>();
-               List<Collection<Event>> allPatterns = new ArrayList<>();
-
-               for (StreamRecord<Event> inputEvent : inputEvents) {
-                       Collection<Map<String, Event>> patterns = nfa.process(
-                               inputEvent.getValue(),
-                               inputEvent.getTimestamp()).f0;
-
-                       for (Map<String, Event> foundPattern : patterns) {
-                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
-                               allPatterns.add(foundPattern.values());
-                       }
-               }
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
 
-               assertEquals(4, allPatterns.size());
-               assertEquals(Sets.newHashSet(
-                       Sets.newHashSet(startEvent,  middleEvent1, 
middleEvent2, middleEvent3),
-                       Sets.newHashSet(startEvent,  middleEvent1, 
middleEvent2),
-                       Sets.newHashSet(startEvent,  middleEvent1),
-                       Sets.newHashSet(startEvent)
-               ), resultingPatterns);
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                               Lists.newArrayList(startEvent,  middleEvent1, 
middleEvent2, middleEvent3),
+                               Lists.newArrayList(startEvent,  middleEvent1, 
middleEvent2),
+                               Lists.newArrayList(startEvent,  middleEvent1),
+                               Lists.newArrayList(startEvent)
+               ));
        }
 
        @Test
@@ -1749,25 +1536,12 @@ public class NFAITCase extends TestLogger {
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-               Set<Set<Event>> resultingPatterns = new HashSet<>();
-               List<Collection<Event>> allPatterns = new ArrayList<>();
-
-               for (StreamRecord<Event> inputEvent : inputEvents) {
-                       Collection<Map<String, Event>> patterns = nfa.process(
-                               inputEvent.getValue(),
-                               inputEvent.getTimestamp()).f0;
-
-                       for (Map<String, Event> foundPattern : patterns) {
-                               resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
-                               allPatterns.add(foundPattern.values());
-                       }
-               }
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
 
-               assertEquals(2, allPatterns.size());
-               assertEquals(Sets.newHashSet(
-                       Sets.newHashSet(startEvent,  middleEvent1),
-                       Sets.newHashSet(startEvent)
-               ), resultingPatterns);
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                               Lists.newArrayList(startEvent,  middleEvent1),
+                               Lists.newArrayList(startEvent)
+               ));
        }
 
        @Test
@@ -1972,7 +1746,7 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               
}).times(2).consecutive().optional().followedBy("end1").where(new 
SimpleCondition<Event>() { // TODO: 4/4/17 also check order consecutive() vs 
optional()
+               
}).times(2).consecutive().optional().followedBy("end1").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -2063,7 +1837,7 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               
}).times(2).consecutive().optional().followedBy("end1").where(new 
SimpleCondition<Event>() { // TODO: 4/4/17 also check order consecutive() vs 
optional()
+               
}).times(2).consecutive().optional().followedBy("end1").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -2108,7 +1882,7 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               
}).times(2).allowCombinations().optional().followedBy("end1").where(new 
SimpleCondition<Event>() { // TODO: 4/4/17 also check order consecutive() vs 
optional()
+               
}).times(2).allowCombinations().optional().followedBy("end1").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -3165,26 +2939,28 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<>(d, 5));
 
                Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+                       private static final long serialVersionUID = 
5167288560432018992L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
                }).notNext("notPattern").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
2242479288129905510L;
+
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("b");
                        }
                }).followedByAny("middle").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+                       private static final long serialVersionUID = 
1404509325548220892L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
                }).followedBy("end").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+                       private static final long serialVersionUID = 
-8907427230007830915L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
@@ -3219,26 +2995,28 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<>(d, 5));
 
                Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+                       private static final long serialVersionUID = 
-339500190577666439L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
                }).notNext("notPattern").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
-6913980632538046451L;
+
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("b");
                        }
                }).followedBy("middle").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+                       private static final long serialVersionUID = 
3332196998905139891L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
                }).followedBy("end").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+                       private static final long serialVersionUID = 
2086563479959018387L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
@@ -3270,27 +3048,29 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<>(b1, 5));
 
                Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+                       private static final long serialVersionUID = 
1672995058886176627L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
                }).followedByAny("middle").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+                       private static final long serialVersionUID = 
6003621617520261554L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
                }).followedByAny("end").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+                       private static final long serialVersionUID = 
887700237024758417L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("d");
                        }
                }).notNext("notPattern").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5239529076086933032L;
+
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("b");
@@ -3321,26 +3101,28 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<>(d, 5));
 
                Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+                       private static final long serialVersionUID = 
-2641662468313191976L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
                }).notFollowedBy("notPattern").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
-3632144132379494778L;
+
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("b");
                        }
                }).followedByAny("middle").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+                       private static final long serialVersionUID = 
3818766882138348167L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
                }).followedBy("end").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+                       private static final long serialVersionUID = 
2033204730795451288L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
@@ -3374,26 +3156,28 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<>(d, 5));
 
                Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+                       private static final long serialVersionUID = 
-2454396370205097543L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
                }).notFollowedBy("notPattern").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
2749547391611263290L;
+
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("b");
                        }
                }).followedByAny("middle").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+                       private static final long serialVersionUID = 
-4989511337298217255L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
                }).optional().followedBy("end").where(new 
SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+                       private static final long serialVersionUID = 
-8466223836652936608L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
@@ -3427,26 +3211,28 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<>(d, 5));
 
                Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+                       private static final long serialVersionUID = 
-2568839911852184515L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
                }).followedByAny("middle").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+                       private static final long serialVersionUID = 
-3632232424064269636L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("b");
                        }
                }).times(2).notFollowedBy("notPattern").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
3685596793523534611L;
+
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
                }).followedBy("end").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+                       private static final long serialVersionUID = 
1960758663575587243L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
@@ -3482,26 +3268,28 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<>(d2, 5));
 
                Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+                       private static final long serialVersionUID = 
2814850350025111940L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
                }).notFollowedBy("notPattern").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
4988756153568853834L;
+
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("b");
                        }
                }).followedByAny("middle").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+                       private static final long serialVersionUID = 
-225909103322018778L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
                }).times(2).optional().followedBy("end").where(new 
SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+                       private static final long serialVersionUID = 
-924294627956373696L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
@@ -3539,26 +3327,28 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<>(d2, 5));
 
                Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+                       private static final long serialVersionUID = 
6193105689601702341L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
                }).followedByAny("middle").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+                       private static final long serialVersionUID = 
5195859580923169111L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("b");
                        }
                }).times(2).notFollowedBy("notPattern").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
4973027956103783831L;
+
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
                }).followedBy("end").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+                       private static final long serialVersionUID = 
2724622546678984894L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
@@ -3588,19 +3378,21 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<>(c2, 4));
 
                Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+                       private static final long serialVersionUID = 
-4289351792573443294L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
                }).notFollowedBy("notPattern").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
-4989574608417523507L;
+
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("b");
                        }
                }).followedByAny("end").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+                       private static final long serialVersionUID = 
-5940131818629290579L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
@@ -3635,26 +3427,28 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<>(d, 5));
 
                Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+                       private static final long serialVersionUID = 
-7885381452276160322L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
                }).notFollowedBy("notPattern").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
3471511260235826653L;
+
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("b");
                        }
                }).followedByAny("middle").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+                       private static final long serialVersionUID = 
9073793782452363833L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
                }).times(2).optional().followedBy("end").where(new 
SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+                       private static final long serialVersionUID = 
7972902718259767076L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
@@ -3690,26 +3484,28 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<>(d, 6));
 
                Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+                       private static final long serialVersionUID = 
-7866220136345465444L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
                }).notFollowedBy("notPattern").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
4957837489028234932L;
+
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("b");
                        }
                }).followedBy("middle").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+                       private static final long serialVersionUID = 
5569569968862808007L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
                }).followedBy("end").where(new SimpleCondition<Event>() {
-                       private static final long serialVersionUID = 
5726188262756267490L;
+                       private static final long serialVersionUID = 
-8579678167937416269L;
 
                        @Override
                        public boolean filter(Event value) throws Exception {
@@ -4125,12 +3921,16 @@ public class NFAITCase extends TestLogger {
                List<List<Event>> resultingPatterns = new ArrayList<>();
 
                for (StreamRecord<Event> inputEvent : inputEvents) {
-                       Collection<Map<String, Event>> patterns = nfa.process(
+                       Collection<Map<String, List<Event>>> patterns = 
nfa.process(
                                inputEvent.getValue(),
                                inputEvent.getTimestamp()).f0;
 
-                       for (Map<String, Event> p: patterns) {
-                               resultingPatterns.add(new 
ArrayList<>(p.values()));
+                       for (Map<String, List<Event>> p: patterns) {
+                               List<Event> res = new ArrayList<>();
+                               for (List<Event> le: p.values()) {
+                                       res.addAll(le);
+                               }
+                               resultingPatterns.add(res);
                        }
                }
                return resultingPatterns;

http://git-wip-us.apache.org/repos/asf/flink/blob/ae9c9d06/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
index d2e392b..11d193a 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
@@ -82,20 +82,20 @@ public class NFATest extends TestLogger {
                nfa.addState(endState);
                nfa.addState(endingState);
 
-               Set<Map<String, Event>> expectedPatterns = new HashSet<>();
+               Set<Map<String, List<Event>>> expectedPatterns = new 
HashSet<>();
 
-               Map<String, Event> firstPattern = new HashMap<>();
-               firstPattern.put("start", new Event(1, "start", 1.0));
-               firstPattern.put("end", new Event(4, "end", 4.0));
+               Map<String, List<Event>> firstPattern = new HashMap<>();
+               firstPattern.put("start", Collections.singletonList(new 
Event(1, "start", 1.0)));
+               firstPattern.put("end", Collections.singletonList(new Event(4, 
"end", 4.0)));
 
-               Map<String, Event> secondPattern = new HashMap<>();
-               secondPattern.put("start", new Event(3, "start", 3.0));
-               secondPattern.put("end", new Event(4, "end", 4.0));
+               Map<String, List<Event>> secondPattern = new HashMap<>();
+               secondPattern.put("start", Collections.singletonList(new 
Event(3, "start", 3.0)));
+               secondPattern.put("end", Collections.singletonList(new Event(4, 
"end", 4.0)));
 
                expectedPatterns.add(firstPattern);
                expectedPatterns.add(secondPattern);
 
-               Collection<Map<String, Event>> actualPatterns = runNFA(nfa, 
streamEvents);
+               Collection<Map<String, List<Event>>> actualPatterns = 
runNFA(nfa, streamEvents);
 
                assertEquals(expectedPatterns, actualPatterns);
        }
@@ -110,15 +110,15 @@ public class NFATest extends TestLogger {
                streamEvents.add(new StreamRecord<>(new Event(3, "start", 3.0), 
3L));
                streamEvents.add(new StreamRecord<>(new Event(4, "end", 4.0), 
4L));
 
-               Set<Map<String, Event>> expectedPatterns = new HashSet<>();
+               Set<Map<String, List<Event>>> expectedPatterns = new 
HashSet<>();
 
-               Map<String, Event> secondPattern = new HashMap<>();
-               secondPattern.put("start", new Event(3, "start", 3.0));
-               secondPattern.put("end", new Event(4, "end", 4.0));
+               Map<String, List<Event>> secondPattern = new HashMap<>();
+               secondPattern.put("start", Collections.singletonList(new 
Event(3, "start", 3.0)));
+               secondPattern.put("end", Collections.singletonList(new Event(4, 
"end", 4.0)));
 
                expectedPatterns.add(secondPattern);
 
-               Collection<Map<String, Event>> actualPatterns = runNFA(nfa, 
streamEvents);
+               Collection<Map<String, List<Event>>> actualPatterns = 
runNFA(nfa, streamEvents);
 
                assertEquals(expectedPatterns, actualPatterns);
        }
@@ -135,9 +135,9 @@ public class NFATest extends TestLogger {
                streamEvents.add(new StreamRecord<>(new Event(1, "start", 1.0), 
1L));
                streamEvents.add(new StreamRecord<>(new Event(2, "end", 2.0), 
3L));
 
-               Set<Map<String, Event>> expectedPatterns = 
Collections.emptySet();
+               Set<Map<String, List<Event>>> expectedPatterns = 
Collections.emptySet();
 
-               Collection<Map<String, Event>> actualPatterns = runNFA(nfa, 
streamEvents);
+               Collection<Map<String, List<Event>>> actualPatterns = 
runNFA(nfa, streamEvents);
 
                assertEquals(expectedPatterns, actualPatterns);
        }
@@ -156,40 +156,24 @@ public class NFATest extends TestLogger {
                streamEvents.add(new StreamRecord<>(new Event(3, "foobar", 
3.0), 3L));
                streamEvents.add(new StreamRecord<>(new Event(4, "end", 4.0), 
3L));
 
-               Set<Map<String, Event>> expectedPatterns = new HashSet<>();
+               Set<Map<String, List<Event>>> expectedPatterns = new 
HashSet<>();
 
-               Map<String, Event> secondPattern = new HashMap<>();
-               secondPattern.put("start", new Event(2, "start", 2.0));
-               secondPattern.put("end", new Event(4, "end", 4.0));
+               Map<String, List<Event>> secondPattern = new HashMap<>();
+               secondPattern.put("start", Collections.singletonList(new 
Event(2, "start", 2.0)));
+               secondPattern.put("end", Collections.singletonList(new Event(4, 
"end", 4.0)));
 
                expectedPatterns.add(secondPattern);
 
-               Collection<Map<String, Event>> actualPatterns = runNFA(nfa, 
streamEvents);
+               Collection<Map<String, List<Event>>> actualPatterns = 
runNFA(nfa, streamEvents);
 
                assertEquals(expectedPatterns, actualPatterns);
        }
 
-       @Test
-       public void testStateNameGeneration() {
-               String expectedName1 = "a[2]";
-               String expectedName2 = "a_3";
-               String expectedName3 = "a[][42]";
-
-               String generatedName1 = NFA.generateStateName("a[]", 2);
-               String generatedName2 = NFA.generateStateName("a", 3);
-               String generatedName3 = NFA.generateStateName("a[][]", 42);
-
-
-               assertEquals(expectedName1, generatedName1);
-               assertEquals(expectedName2, generatedName2);
-               assertEquals(expectedName3, generatedName3);
-       }
-
-       public <T> Collection<Map<String, T>> runNFA(NFA<T> nfa, 
List<StreamRecord<T>> inputs) {
-               Set<Map<String, T>> actualPatterns = new HashSet<>();
+       public <T> Collection<Map<String, List<T>>> runNFA(NFA<T> nfa, 
List<StreamRecord<T>> inputs) {
+               Set<Map<String, List<T>>> actualPatterns = new HashSet<>();
 
                for (StreamRecord<T> streamEvent : inputs) {
-                       Collection<Map<String, T>> matchedPatterns = 
nfa.process(
+                       Collection<Map<String, List<T>>> matchedPatterns = 
nfa.process(
                                streamEvent.getValue(),
                                streamEvent.getTimestamp()).f0;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ae9c9d06/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
index adc07b3..2da3c31 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
@@ -18,7 +18,8 @@
 
 package org.apache.flink.cep.nfa;
 
-import com.google.common.collect.LinkedHashMultimap;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
 import org.apache.flink.cep.Event;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
@@ -48,12 +49,12 @@ public class SharedBufferTest extends TestLogger {
                        events[i] = new Event(i + 1, "e" + (i + 1), i);
                }
 
-               LinkedHashMultimap<String, Event> expectedPattern1 = 
LinkedHashMultimap.create();
+               ListMultimap<String, Event> expectedPattern1 = 
ArrayListMultimap.create();
                expectedPattern1.put("a1", events[2]);
                expectedPattern1.put("a[]", events[3]);
                expectedPattern1.put("b", events[5]);
 
-               LinkedHashMultimap<String, Event> expectedPattern2 = 
LinkedHashMultimap.create();
+               ListMultimap<String, Event> expectedPattern2 = 
ArrayListMultimap.create();
                expectedPattern2.put("a1", events[0]);
                expectedPattern2.put("a[]", events[1]);
                expectedPattern2.put("a[]", events[2]);
@@ -61,7 +62,7 @@ public class SharedBufferTest extends TestLogger {
                expectedPattern2.put("a[]", events[4]);
                expectedPattern2.put("b", events[5]);
 
-               LinkedHashMultimap<String, Event> expectedPattern3 = 
LinkedHashMultimap.create();
+               ListMultimap<String, Event> expectedPattern3 = 
ArrayListMultimap.create();
                expectedPattern3.put("a1", events[0]);
                expectedPattern3.put("a[]", events[1]);
                expectedPattern3.put("a[]", events[2]);
@@ -84,11 +85,11 @@ public class SharedBufferTest extends TestLogger {
                sharedBuffer.put("a[]", events[6], timestamp, "a[]", events[5], 
timestamp, DeweyNumber.fromString("1.1"));
                sharedBuffer.put("b", events[7], timestamp, "a[]", events[6], 
timestamp, DeweyNumber.fromString("1.1.0"));
 
-               Collection<LinkedHashMultimap<String, Event>> patterns3 = 
sharedBuffer.extractPatterns("b", events[7], timestamp, 
DeweyNumber.fromString("1.1.0"));
+               Collection<ListMultimap<String, Event>> patterns3 = 
sharedBuffer.extractPatterns("b", events[7], timestamp, 
DeweyNumber.fromString("1.1.0"));
                sharedBuffer.release("b", events[7], timestamp);
-               Collection<LinkedHashMultimap<String, Event>> patterns4 = 
sharedBuffer.extractPatterns("b", events[7], timestamp, 
DeweyNumber.fromString("1.1.0"));
-               Collection<LinkedHashMultimap<String, Event>> patterns1 = 
sharedBuffer.extractPatterns("b", events[5], timestamp, 
DeweyNumber.fromString("2.0.0"));
-               Collection<LinkedHashMultimap<String, Event>> patterns2 = 
sharedBuffer.extractPatterns("b", events[5], timestamp, 
DeweyNumber.fromString("1.0.0"));
+               Collection<ListMultimap<String, Event>> patterns4 = 
sharedBuffer.extractPatterns("b", events[7], timestamp, 
DeweyNumber.fromString("1.1.0"));
+               Collection<ListMultimap<String, Event>> patterns1 = 
sharedBuffer.extractPatterns("b", events[5], timestamp, 
DeweyNumber.fromString("2.0.0"));
+               Collection<ListMultimap<String, Event>> patterns2 = 
sharedBuffer.extractPatterns("b", events[5], timestamp, 
DeweyNumber.fromString("1.0.0"));
                sharedBuffer.release("b", events[5], timestamp);
 
                assertEquals(1L, patterns3.size());

http://git-wip-us.apache.org/repos/asf/flink/blob/ae9c9d06/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
index 90a6321..26b8ce9 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.cep.nfa.compiler;
 
-import com.google.common.collect.Sets;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;

http://git-wip-us.apache.org/repos/asf/flink/blob/ae9c9d06/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
index 2f7cdeb..afb3e7c 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.streaming.util.OperatorSnapshotUtil;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
@@ -72,7 +73,7 @@ public class CEPFrom12MigrationTest {
                final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 
10.0);
                final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 
10.0);
 
-               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness =
+               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness =
                                new KeyedOneInputStreamOperatorTestHarness<>(
                                                new KeyedCEPPatternOperator<>(
                                                                
Event.createTypeSerializer(),
@@ -120,7 +121,7 @@ public class CEPFrom12MigrationTest {
                final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 
10.0);
                final Event endEvent = new Event(42, "end", 1.0);
 
-               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness =
+               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness =
                                new KeyedOneInputStreamOperatorTestHarness<>(
                                                new KeyedCEPPatternOperator<>(
                                                                
Event.createTypeSerializer(),
@@ -160,18 +161,18 @@ public class CEPFrom12MigrationTest {
                assertTrue(resultRecord2.getValue() instanceof Map);
 
                @SuppressWarnings("unchecked")
-               Map<String, Event> patternMap1 = (Map<String, Event>) 
resultRecord1.getValue();
+               Map<String, List<Event>> patternMap1 = (Map<String, 
List<Event>>) resultRecord1.getValue();
 
-               assertEquals(startEvent, patternMap1.get("start"));
-               assertEquals(middleEvent1, patternMap1.get("middle"));
-               assertEquals(endEvent, patternMap1.get("end"));
+               assertEquals(startEvent, patternMap1.get("start").get(0));
+               assertEquals(middleEvent1, patternMap1.get("middle").get(0));
+               assertEquals(endEvent, patternMap1.get("end").get(0));
 
                @SuppressWarnings("unchecked")
-               Map<String, Event> patternMap2 = (Map<String, Event>) 
resultRecord2.getValue();
+               Map<String, List<Event>> patternMap2 = (Map<String, 
List<Event>>) resultRecord2.getValue();
 
-               assertEquals(startEvent, patternMap2.get("start"));
-               assertEquals(middleEvent2, patternMap2.get("middle"));
-               assertEquals(endEvent, patternMap2.get("end"));
+               assertEquals(startEvent, patternMap2.get("start").get(0));
+               assertEquals(middleEvent2, patternMap2.get("middle").get(0));
+               assertEquals(endEvent, patternMap2.get("end").get(0));
 
                harness.close();
        }
@@ -195,7 +196,7 @@ public class CEPFrom12MigrationTest {
                final Event startEvent1 = new Event(42, "start", 1.0);
                final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 
10.0);
 
-               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness =
+               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness =
                                new KeyedOneInputStreamOperatorTestHarness<>(
                                                new KeyedCEPPatternOperator<>(
                                                                
Event.createTypeSerializer(),
@@ -241,7 +242,7 @@ public class CEPFrom12MigrationTest {
                final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 
10.0);
                final Event endEvent = new Event(42, "end", 1.0);
 
-               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness =
+               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness =
                                new KeyedOneInputStreamOperatorTestHarness<>(
                                                new KeyedCEPPatternOperator<>(
                                                                
Event.createTypeSerializer(),
@@ -287,25 +288,25 @@ public class CEPFrom12MigrationTest {
                assertTrue(resultRecord3.getValue() instanceof Map);
 
                @SuppressWarnings("unchecked")
-               Map<String, Event> patternMap1 = (Map<String, Event>) 
resultRecord1.getValue();
+               Map<String, List<Event>> patternMap1 = (Map<String, 
List<Event>>) resultRecord1.getValue();
 
-               assertEquals(startEvent1, patternMap1.get("start"));
-               assertEquals(middleEvent1, patternMap1.get("middle"));
-               assertEquals(endEvent, patternMap1.get("end"));
+               assertEquals(startEvent1, patternMap1.get("start").get(0));
+               assertEquals(middleEvent1, patternMap1.get("middle").get(0));
+               assertEquals(endEvent, patternMap1.get("end").get(0));
 
                @SuppressWarnings("unchecked")
-               Map<String, Event> patternMap2 = (Map<String, Event>) 
resultRecord2.getValue();
+               Map<String, List<Event>> patternMap2 = (Map<String, 
List<Event>>) resultRecord2.getValue();
 
-               assertEquals(startEvent1, patternMap2.get("start"));
-               assertEquals(middleEvent2, patternMap2.get("middle"));
-               assertEquals(endEvent, patternMap2.get("end"));
+               assertEquals(startEvent1, patternMap2.get("start").get(0));
+               assertEquals(middleEvent2, patternMap2.get("middle").get(0));
+               assertEquals(endEvent, patternMap2.get("end").get(0));
 
                @SuppressWarnings("unchecked")
-               Map<String, Event> patternMap3 = (Map<String, Event>) 
resultRecord3.getValue();
+               Map<String, List<Event>> patternMap3 = (Map<String, 
List<Event>>) resultRecord3.getValue();
 
-               assertEquals(startEvent2, patternMap3.get("start"));
-               assertEquals(middleEvent2, patternMap3.get("middle"));
-               assertEquals(endEvent, patternMap3.get("end"));
+               assertEquals(startEvent2, patternMap3.get("start").get(0));
+               assertEquals(middleEvent2, patternMap3.get("middle").get(0));
+               assertEquals(endEvent, patternMap3.get("end").get(0));
 
                harness.close();
        }
@@ -328,7 +329,7 @@ public class CEPFrom12MigrationTest {
 
                final Event startEvent1 = new Event(42, "start", 1.0);
 
-               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness =
+               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness =
                                new KeyedOneInputStreamOperatorTestHarness<>(
                                                new KeyedCEPPatternOperator<>(
                                                                
Event.createTypeSerializer(),
@@ -367,7 +368,7 @@ public class CEPFrom12MigrationTest {
 
                final Event startEvent1 = new Event(42, "start", 1.0);
 
-               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness =
+               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness =
                                new KeyedOneInputStreamOperatorTestHarness<>(
                                                new KeyedCEPPatternOperator<>(
                                                                
Event.createTypeSerializer(),
@@ -401,9 +402,9 @@ public class CEPFrom12MigrationTest {
                assertTrue(resultRecord.getValue() instanceof Map);
 
                @SuppressWarnings("unchecked")
-               Map<String, Event> patternMap = (Map<String, Event>) 
resultRecord.getValue();
+               Map<String, List<Event>> patternMap = (Map<String, 
List<Event>>) resultRecord.getValue();
 
-               assertEquals(startEvent1, patternMap.get("start"));
+               assertEquals(startEvent1, patternMap.get("start").get(0));
 
                harness.close();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ae9c9d06/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
index 4e05fcf..404de54 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
@@ -37,6 +37,7 @@ import 
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.junit.Test;
 
 import java.net.URL;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
@@ -95,7 +96,7 @@ public class CEPMigration11to13Test {
                harness.close();
                */
 
-               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness =
+               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness =
                                new KeyedOneInputStreamOperatorTestHarness<>(
                                                new KeyedCEPPatternOperator<>(
                                                                
Event.createTypeSerializer(),
@@ -129,11 +130,11 @@ public class CEPMigration11to13Test {
                assertTrue(resultRecord.getValue() instanceof Map);
 
                @SuppressWarnings("unchecked")
-               Map<String, Event> patternMap = (Map<String, Event>) 
resultRecord.getValue();
+               Map<String, List<Event>> patternMap = (Map<String, 
List<Event>>) resultRecord.getValue();
 
-               assertEquals(startEvent, patternMap.get("start"));
-               assertEquals(middleEvent, patternMap.get("middle"));
-               assertEquals(endEvent, patternMap.get("end"));
+               assertEquals(startEvent, patternMap.get("start").get(0));
+               assertEquals(middleEvent, patternMap.get("middle").get(0));
+               assertEquals(endEvent, patternMap.get("end").get(0));
 
                harness.close();
        }
@@ -170,7 +171,7 @@ public class CEPMigration11to13Test {
 
                NullByteKeySelector keySelector = new NullByteKeySelector();
 
-               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness =
+               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness =
                                new KeyedOneInputStreamOperatorTestHarness<>(
                                                new KeyedCEPPatternOperator<>(
                                                                
Event.createTypeSerializer(),
@@ -204,11 +205,11 @@ public class CEPMigration11to13Test {
                assertTrue(resultRecord.getValue() instanceof Map);
 
                @SuppressWarnings("unchecked")
-               Map<String, Event> patternMap = (Map<String, Event>) 
resultRecord.getValue();
+               Map<String, List<Event>> patternMap = (Map<String, 
List<Event>>) resultRecord.getValue();
 
-               assertEquals(startEvent, patternMap.get("start"));
-               assertEquals(middleEvent, patternMap.get("middle"));
-               assertEquals(endEvent, patternMap.get("end"));
+               assertEquals(startEvent, patternMap.get("start").get(0));
+               assertEquals(middleEvent, patternMap.get("middle").get(0));
+               assertEquals(endEvent, patternMap.get("end").get(0));
 
                harness.close();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ae9c9d06/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 4048bc2..5ed8b46 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -46,7 +46,9 @@ import org.junit.rules.TemporaryFolder;
 
 import static org.junit.Assert.*;
 
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 
@@ -58,7 +60,7 @@ public class CEPOperatorTest extends TestLogger {
        @Test
        public void testKeyedCEPOperatorWatermarkForwarding() throws Exception {
 
-               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness = getCepTestHarness(false);
+               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness = getCepTestHarness(false);
 
                harness.open();
 
@@ -74,7 +76,7 @@ public class CEPOperatorTest extends TestLogger {
        @Test
        public void testKeyedCEPOperatorCheckpointing() throws Exception {
 
-               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness = getCepTestHarness(false);
+               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness = getCepTestHarness(false);
 
                harness.open();
 
@@ -138,7 +140,7 @@ public class CEPOperatorTest extends TestLogger {
                RocksDBStateBackend rocksDBStateBackend = new 
RocksDBStateBackend(new MemoryStateBackend());
                rocksDBStateBackend.setDbStoragePath(rocksDbPath);
 
-               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness = getCepTestHarness(false);
+               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness = getCepTestHarness(false);
 
                harness.setStateBackend(rocksDBStateBackend);
 
@@ -208,7 +210,6 @@ public class CEPOperatorTest extends TestLogger {
         * Tests that the internal time of a CEP operator advances only given 
watermarks. See FLINK-5033
         */
        @Test
-       @SuppressWarnings("unchecked")
        public void testKeyedAdvancingTimeWithoutElements() throws Exception {
                final KeySelector<Event, Integer> keySelector = new 
TestKeySelector();
 
@@ -216,10 +217,10 @@ public class CEPOperatorTest extends TestLogger {
                final long watermarkTimestamp1 = 5L;
                final long watermarkTimestamp2 = 13L;
 
-               final Map<String, Event> expectedSequence = new HashMap<>(2);
-               expectedSequence.put("start", startEvent);
+               final Map<String, List<Event>> expectedSequence = new 
HashMap<>(2);
+               expectedSequence.put("start", 
Collections.<Event>singletonList(startEvent));
 
-               OneInputStreamOperatorTestHarness<Event, 
Either<Tuple2<Map<String, Event>, Long>, Map<String, Event>>> harness = new 
KeyedOneInputStreamOperatorTestHarness<>(
+               OneInputStreamOperatorTestHarness<Event, 
Either<Tuple2<Map<String, List<Event>>, Long>, Map<String, List<Event>>>> 
harness = new KeyedOneInputStreamOperatorTestHarness<>(
                        new TimeoutKeyedCEPPatternOperator<>(
                                Event.createTypeSerializer(),
                                false,
@@ -234,7 +235,7 @@ public class CEPOperatorTest extends TestLogger {
                try {
                        harness.setup(
                                new KryoSerializer<>(
-                                       (Class<Either<Tuple2<Map<String, 
Event>, Long>, Map<String, Event>>>) (Object) Either.class,
+                                       (Class<Either<Tuple2<Map<String, 
List<Event>>, Long>, Map<String, List<Event>>>>) (Object) Either.class,
                                        new ExecutionConfig()));
                        harness.open();
 
@@ -256,13 +257,15 @@ public class CEPOperatorTest extends TestLogger {
 
                        assertTrue(resultObject instanceof StreamRecord);
 
-                       StreamRecord<Either<Tuple2<Map<String, Event>, Long>, 
Map<String, Event>>> streamRecord = 
(StreamRecord<Either<Tuple2<Map<String,Event>,Long>,Map<String,Event>>>) 
resultObject;
+                       StreamRecord<Either<Tuple2<Map<String, List<Event>>, 
Long>, Map<String, List<Event>>>> streamRecord =
+                                       
(StreamRecord<Either<Tuple2<Map<String,List<Event>>,Long>,Map<String,List<Event>>>>)
 resultObject;
 
                        assertTrue(streamRecord.getValue() instanceof 
Either.Left);
 
-                       Either.Left<Tuple2<Map<String, Event>, Long>, 
Map<String, Event>> left = (Either.Left<Tuple2<Map<String, Event>, Long>, 
Map<String, Event>>) streamRecord.getValue();
+                       Either.Left<Tuple2<Map<String, List<Event>>, Long>, 
Map<String, List<Event>>> left =
+                       (Either.Left<Tuple2<Map<String, List<Event>>, Long>, 
Map<String, List<Event>>>) streamRecord.getValue();
 
-                       Tuple2<Map<String, Event>, Long> leftResult = 
left.left();
+                       Tuple2<Map<String, List<Event>>, Long> leftResult = 
left.left();
 
                        assertEquals(watermarkTimestamp2, (long) leftResult.f1);
                        assertEquals(expectedSequence, leftResult.f0);
@@ -292,7 +295,7 @@ public class CEPOperatorTest extends TestLogger {
 
                TestKeySelector keySelector = new TestKeySelector();
                KeyedCEPPatternOperator<Event, Integer> operator = 
getKeyedCepOpearator(false, keySelector);
-               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness = getCepTestHarness(operator);
+               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness = getCepTestHarness(operator);
 
                harness.open();
 
@@ -380,7 +383,7 @@ public class CEPOperatorTest extends TestLogger {
 
                TestKeySelector keySelector = new TestKeySelector();
                KeyedCEPPatternOperator<Event, Integer> operator = 
getKeyedCepOpearator(true, keySelector);
-               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness = getCepTestHarness(operator);
+               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness = getCepTestHarness(operator);
 
                harness.open();
 
@@ -449,13 +452,13 @@ public class CEPOperatorTest extends TestLogger {
                assertTrue(resultRecord.getValue() instanceof Map);
 
                @SuppressWarnings("unchecked")
-               Map<String, Event> patternMap = (Map<String, Event>) 
resultRecord.getValue();
-               assertEquals(start, patternMap.get("start"));
-               assertEquals(middle, patternMap.get("middle"));
-               assertEquals(end, patternMap.get("end"));
+               Map<String, List<Event>> patternMap = (Map<String, 
List<Event>>) resultRecord.getValue();
+               assertEquals(start, patternMap.get("start").get(0));
+               assertEquals(middle, patternMap.get("middle").get(0));
+               assertEquals(end, patternMap.get("end").get(0));
        }
 
-       private OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
getCepTestHarness(boolean isProcessingTime) throws Exception {
+       private OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> getCepTestHarness(boolean isProcessingTime) throws Exception {
                KeySelector<Event, Integer> keySelector = new TestKeySelector();
 
                return new KeyedOneInputStreamOperatorTestHarness<>(
@@ -464,7 +467,7 @@ public class CEPOperatorTest extends TestLogger {
                        BasicTypeInfo.INT_TYPE_INFO);
        }
 
-       private OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
getCepTestHarness(
+       private OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> getCepTestHarness(
                        KeyedCEPPatternOperator<Event, Integer> cepOperator) 
throws Exception {
                KeySelector<Event, Integer> keySelector = new TestKeySelector();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ae9c9d06/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
index a048183..0210ef9 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
@@ -37,6 +37,7 @@ import 
org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.junit.Test;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 
@@ -79,7 +80,7 @@ public class CEPRescalingTest {
 
                // now we start the test, we go from parallelism 1 to 2.
 
-               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness =
+               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness =
                        getTestHarness(maxParallelism, 1, 0);
                harness.open();
 
@@ -99,7 +100,7 @@ public class CEPRescalingTest {
                // so we initialize the two tasks and we put the rest of
                // the valid elements for the pattern on task 0.
 
-               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness1 =
+               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness1 =
                        getTestHarness(maxParallelism, 2, 0);
 
                harness1.setup();
@@ -120,7 +121,7 @@ public class CEPRescalingTest {
                verifyWatermark(harness1.getOutput().poll(), 2);
                verifyPattern(harness1.getOutput().poll(), startEvent1, 
middleEvent1, endEvent1);
 
-               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness2 =
+               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness2 =
                        getTestHarness(maxParallelism, 2, 1);
 
                harness2.setup();
@@ -198,15 +199,15 @@ public class CEPRescalingTest {
 
                // starting the test, we will go from parallelism of 3 to 
parallelism of 2
 
-               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness1 =
+               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness1 =
                        getTestHarness(maxParallelism, 3, 0);
                harness1.open();
 
-               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness2 =
+               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness2 =
                        getTestHarness(maxParallelism, 3, 1);
                harness2.open();
 
-               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness3 =
+               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness3 =
                        getTestHarness(maxParallelism, 3, 2);
                harness3.open();
 
@@ -251,13 +252,13 @@ public class CEPRescalingTest {
                        harness3.snapshot(0, 0)
                );
 
-               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness4 =
+               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness4 =
                        getTestHarness(maxParallelism, 2, 0);
                harness4.setup();
                harness4.initializeState(snapshot);
                harness4.open();
 
-               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness5 =
+               OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness5 =
                        getTestHarness(maxParallelism, 2, 1);
                harness5.setup();
                harness5.initializeState(snapshot);
@@ -295,8 +296,8 @@ public class CEPRescalingTest {
                assertTrue(resultRecord.getValue() instanceof Map);
 
                @SuppressWarnings("unchecked")
-               Map<String, Event> patternMap = (Map<String, Event>) 
resultRecord.getValue();
-               if (patternMap.get("start").getId() == 7) {
+               Map<String, List<Event>> patternMap = (Map<String, 
List<Event>>) resultRecord.getValue();
+               if (patternMap.get("start").get(0).getId() == 7) {
                        verifyPattern(harness4.getOutput().poll(), startEvent1, 
middleEvent1, endEvent1);
                        verifyPattern(harness4.getOutput().poll(), startEvent3, 
middleEvent3, endEvent3);
                } else {
@@ -327,13 +328,13 @@ public class CEPRescalingTest {
                assertTrue(resultRecord.getValue() instanceof Map);
 
                @SuppressWarnings("unchecked")
-               Map<String, Event> patternMap = (Map<String, Event>) 
resultRecord.getValue();
-               assertEquals(start, patternMap.get("start"));
-               assertEquals(middle, patternMap.get("middle"));
-               assertEquals(end, patternMap.get("end"));
+               Map<String, List<Event>> patternMap = (Map<String, 
List<Event>>) resultRecord.getValue();
+               assertEquals(start, patternMap.get("start").get(0));
+               assertEquals(middle, patternMap.get("middle").get(0));
+               assertEquals(end, patternMap.get("end").get(0));
        }
 
-       private KeyedOneInputStreamOperatorTestHarness<Integer, Event, 
Map<String, Event>> getTestHarness(
+       private KeyedOneInputStreamOperatorTestHarness<Integer, Event, 
Map<String, List<Event>>> getTestHarness(
                int maxParallelism,
                int taskParallelism,
                int subtaskIdx) throws Exception {

Reply via email to