Repository: flink Updated Branches: refs/heads/master 9244106b3 -> 00ce3f1b1
http://git-wip-us.apache.org/repos/asf/flink/blob/ae9c9d06/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java index 2cc67e5..46e2fd4 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java @@ -19,7 +19,6 @@ package org.apache.flink.cep.nfa; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import com.google.common.primitives.Doubles; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cep.Event; @@ -156,22 +155,11 @@ public class NFAITCase extends TestLogger { NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - List<Map<String, Event>> resultingPatterns = new ArrayList<>(); - - for (StreamRecord<Event> inputEvent: inputEvents) { - Collection<Map<String, Event>> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - resultingPatterns.addAll(patterns); - } - - assertEquals(1, resultingPatterns.size()); - Map<String, Event> patternMap = resultingPatterns.get(0); + List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(startEvent, patternMap.get("start")); - assertEquals(middleEvent, patternMap.get("middle")); - assertEquals(endEvent, patternMap.get("end")); + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(startEvent, middleEvent, endEvent) + )); } @Test @@ -202,24 +190,11 @@ public class NFAITCase extends TestLogger { NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set<Set<Event>> resultingPatterns = new HashSet<>(); - List<Collection<Event>> allPatterns = new ArrayList<>(); - - for (StreamRecord<Event> inputEvent : inputEvents) { - Collection<Map<String, Event>> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map<String, Event> foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - allPatterns.add(foundPattern.values()); - } - } + List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(1, allPatterns.size()); - assertEquals(Sets.<Set<Event>>newHashSet( - Sets.newHashSet(middleEvent1, end) - ), resultingPatterns); + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(middleEvent1, end) + )); } @Test @@ -252,19 +227,9 @@ public class NFAITCase extends TestLogger { NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set<Set<Event>> resultingPatterns = new HashSet<>(); - - for (StreamRecord<Event> inputEvent : inputEvents) { - Collection<Map<String, Event>> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map<String, Event> foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - } - } + List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(Sets.newHashSet(), resultingPatterns); + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList()); } /** @@ -274,7 +239,6 @@ public class NFAITCase extends TestLogger { @Test public void testSimplePatternWithTimeWindowNFA() { List<StreamRecord<Event>> events = new ArrayList<>(); - List<Map<String, Event>> resultingPatterns = new ArrayList<>(); final Event startEvent; final Event middleEvent; @@ -313,21 +277,11 @@ public class NFAITCase extends TestLogger { NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - for (StreamRecord<Event> event: events) { - Collection<Map<String, Event>> patterns = nfa.process( - event.getValue(), - event.getTimestamp()).f0; - - resultingPatterns.addAll(patterns); - } - - assertEquals(1, resultingPatterns.size()); - - Map<String, Event> patternMap = resultingPatterns.get(0); + List<List<Event>> resultingPatterns = feedNFA(events, nfa); - assertEquals(startEvent, patternMap.get("start")); - assertEquals(middleEvent, patternMap.get("middle")); - assertEquals(endEvent, patternMap.get("end")); + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(startEvent, middleEvent, endEvent) + )); } /** @@ -337,9 +291,9 @@ public class NFAITCase extends TestLogger { @Test public void testSimplePatternWithTimeoutHandling() { List<StreamRecord<Event>> events = new ArrayList<>(); - List<Map<String, Event>> resultingPatterns = new ArrayList<>(); - Set<Tuple2<Map<String, Event>, Long>> resultingTimeoutPatterns = new HashSet<>(); - Set<Tuple2<Map<String, Event>, Long>> expectedTimeoutPatterns = new HashSet<>(); + List<Map<String, List<Event>>> resultingPatterns = new ArrayList<>(); + Set<Tuple2<Map<String, List<Event>>, Long>> resultingTimeoutPatterns = new HashSet<>(); + Set<Tuple2<Map<String, List<Event>>, Long>> expectedTimeoutPatterns = new HashSet<>(); events.add(new StreamRecord<>(new Event(1, "start", 1.0), 1)); events.add(new StreamRecord<>(new Event(2, "start", 1.0), 2)); @@ -348,19 +302,19 @@ public class NFAITCase extends TestLogger { events.add(new StreamRecord<>(new Event(5, "end", 1.0), 11)); events.add(new StreamRecord<>(new Event(6, "end", 1.0), 13)); - Map<String, Event> timeoutPattern1 = new HashMap<>(); - timeoutPattern1.put("start", new Event(1, "start", 1.0)); - timeoutPattern1.put("middle", new Event(3, "middle", 1.0)); + Map<String, List<Event>> timeoutPattern1 = new HashMap<>(); + timeoutPattern1.put("start", Collections.singletonList(new Event(1, "start", 1.0))); + timeoutPattern1.put("middle", Collections.singletonList(new Event(3, "middle", 1.0))); - Map<String, Event> timeoutPattern2 = new HashMap<>(); - timeoutPattern2.put("start", new Event(2, "start", 1.0)); - timeoutPattern2.put("middle", new Event(3, "middle", 1.0)); + Map<String, List<Event>> timeoutPattern2 = new HashMap<>(); + timeoutPattern2.put("start", Collections.singletonList(new Event(2, "start", 1.0))); + timeoutPattern2.put("middle", Collections.singletonList(new Event(3, "middle", 1.0))); - Map<String, Event> timeoutPattern3 = new HashMap<>(); - timeoutPattern3.put("start", new Event(1, "start", 1.0)); + Map<String, List<Event>> timeoutPattern3 = new HashMap<>(); + timeoutPattern3.put("start", Collections.singletonList(new Event(1, "start", 1.0))); - Map<String, Event> timeoutPattern4 = new HashMap<>(); - timeoutPattern4.put("start", new Event(2, "start", 1.0)); + Map<String, List<Event>> timeoutPattern4 = new HashMap<>(); + timeoutPattern4.put("start", Collections.singletonList(new Event(2, "start", 1.0))); expectedTimeoutPatterns.add(Tuple2.of(timeoutPattern1, 11L)); expectedTimeoutPatterns.add(Tuple2.of(timeoutPattern2, 13L)); @@ -393,10 +347,11 @@ public class NFAITCase extends TestLogger { NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), true); for (StreamRecord<Event> event: events) { - final Tuple2<Collection<Map<String, Event>>, Collection<Tuple2<Map<String, Event>, Long>>> patterns = nfa.process(event.getValue(), event.getTimestamp()); + Tuple2<Collection<Map<String, List<Event>>>, Collection<Tuple2<Map<String, List<Event>>, Long>>> patterns = + nfa.process(event.getValue(), event.getTimestamp()); - Collection<Map<String, Event>> matchedPatterns = patterns.f0; - Collection<Tuple2<Map<String, Event>, Long>> timeoutPatterns = patterns.f1; + Collection<Map<String, List<Event>>> matchedPatterns = patterns.f0; + Collection<Tuple2<Map<String, List<Event>>, Long>> timeoutPatterns = patterns.f1; resultingPatterns.addAll(matchedPatterns); resultingTimeoutPatterns.addAll(timeoutPatterns); @@ -460,31 +415,16 @@ public class NFAITCase extends TestLogger { NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - List<Map<String, Event>> resultingPatterns = new ArrayList<>(); - - for (StreamRecord<Event> inputEvent: inputEvents) { - Collection<Map<String, Event>> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - resultingPatterns.addAll(patterns); - } - - assertEquals(6, resultingPatterns.size()); - - final Set<Set<Event>> patterns = new HashSet<>(); - for (Map<String, Event> resultingPattern : resultingPatterns) { - patterns.add(new HashSet<>(resultingPattern.values())); - } + List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(Sets.newHashSet( - Sets.newHashSet(startEvent, middleEvent1, nextOne1, endEvent), - Sets.newHashSet(startEvent, middleEvent2, nextOne1, endEvent), - Sets.newHashSet(startEvent, middleEvent3, nextOne1, endEvent), - Sets.newHashSet(startEvent, middleEvent1, nextOne2, endEvent), - Sets.newHashSet(startEvent, middleEvent2, nextOne2, endEvent), - Sets.newHashSet(startEvent, middleEvent3, nextOne2, endEvent) - ), patterns); + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(startEvent, middleEvent1, nextOne1, endEvent), + Lists.newArrayList(startEvent, middleEvent2, nextOne1, endEvent), + Lists.newArrayList(startEvent, middleEvent3, nextOne1, endEvent), + Lists.newArrayList(startEvent, middleEvent1, nextOne2, endEvent), + Lists.newArrayList(startEvent, middleEvent2, nextOne2, endEvent), + Lists.newArrayList(startEvent, middleEvent3, nextOne2, endEvent) + )); } @Test @@ -548,39 +488,26 @@ public class NFAITCase extends TestLogger { NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set<Set<Event>> resultingPatterns = new HashSet<>(); - List<Collection<Event>> allPatterns = new ArrayList<>(); - - for (StreamRecord<Event> inputEvent : inputEvents) { - Collection<Map<String, Event>> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map<String, Event> foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - allPatterns.add(foundPattern.values()); - } - } + List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(16, allPatterns.size()); - assertEquals(Sets.newHashSet( - Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3, end1, end2, end4), - Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1, end2, end4), - Sets.newHashSet(startEvent, middleEvent1, middleEvent3, end1, end2, end4), - Sets.newHashSet(startEvent, middleEvent2, middleEvent3, end1, end2, end4), - Sets.newHashSet(startEvent, middleEvent1, end1, end2, end4), - Sets.newHashSet(startEvent, middleEvent2, end1, end2, end4), - Sets.newHashSet(startEvent, middleEvent3, end1, end2, end4), - Sets.newHashSet(startEvent, end1, end2, end4), - Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3, end1, end3, end4), - Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1, end3, end4), - Sets.newHashSet(startEvent, middleEvent1, middleEvent3, end1, end3, end4), - Sets.newHashSet(startEvent, middleEvent2, middleEvent3, end1, end3, end4), - Sets.newHashSet(startEvent, middleEvent1, end1, end3, end4), - Sets.newHashSet(startEvent, middleEvent2, end1, end3, end4), - Sets.newHashSet(startEvent, middleEvent3, end1, end3, end4), - Sets.newHashSet(startEvent, end1, end3, end4) - ), resultingPatterns); + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, end1, end2, end4), + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1, end2, end4), + Lists.newArrayList(startEvent, middleEvent1, middleEvent3, end1, end2, end4), + Lists.newArrayList(startEvent, middleEvent2, middleEvent3, end1, end2, end4), + Lists.newArrayList(startEvent, middleEvent1, end1, end2, end4), + Lists.newArrayList(startEvent, middleEvent2, end1, end2, end4), + Lists.newArrayList(startEvent, middleEvent3, end1, end2, end4), + Lists.newArrayList(startEvent, end1, end2, end4), + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, end1, end3, end4), + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1, end3, end4), + Lists.newArrayList(startEvent, middleEvent1, middleEvent3, end1, end3, end4), + Lists.newArrayList(startEvent, middleEvent2, middleEvent3, end1, end3, end4), + Lists.newArrayList(startEvent, middleEvent1, end1, end3, end4), + Lists.newArrayList(startEvent, middleEvent2, end1, end3, end4), + Lists.newArrayList(startEvent, middleEvent3, end1, end3, end4), + Lists.newArrayList(startEvent, end1, end3, end4) + )); } @Test @@ -674,27 +601,14 @@ public class NFAITCase extends TestLogger { NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set<Set<Event>> resultingPatterns = new HashSet<>(); - List<Collection<Event>> allPatterns = new ArrayList<>(); - - for (StreamRecord<Event> inputEvent : inputEvents) { - Collection<Map<String, Event>> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map<String, Event> foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - allPatterns.add(foundPattern.values()); - } - } + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(4, allPatterns.size()); - assertEquals(Sets.newHashSet( - Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3, end1), - Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1), - Sets.newHashSet(startEvent, middleEvent1, end1), - Sets.newHashSet(startEvent, end1) - ), resultingPatterns); + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, end1), + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1), + Lists.newArrayList(startEvent, middleEvent1, end1), + Lists.newArrayList(startEvent, end1) + )); } @Test @@ -729,30 +643,17 @@ public class NFAITCase extends TestLogger { NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set<Set<Event>> resultingPatterns = new HashSet<>(); - List<Collection<Event>> allPatterns = new ArrayList<>(); - - for (StreamRecord<Event> inputEvent : inputEvents) { - Collection<Map<String, Event>> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map<String, Event> foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - allPatterns.add(foundPattern.values()); - } - } + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(7, allPatterns.size()); - assertEquals(Sets.newHashSet( - Sets.newHashSet(middleEvent1, middleEvent2, middleEvent3, end), - Sets.newHashSet(middleEvent1, middleEvent2, end), - Sets.newHashSet(middleEvent2, middleEvent3, end), - Sets.newHashSet(middleEvent1, end), - Sets.newHashSet(middleEvent2, end), - Sets.newHashSet(middleEvent3, end), - Sets.newHashSet(end) - ), resultingPatterns); + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(middleEvent1, middleEvent2, middleEvent3, end), + Lists.newArrayList(middleEvent1, middleEvent2, end), + Lists.newArrayList(middleEvent2, middleEvent3, end), + Lists.newArrayList(middleEvent1, end), + Lists.newArrayList(middleEvent2, end), + Lists.newArrayList(middleEvent3, end), + Lists.newArrayList(end) + )); } @Test @@ -805,29 +706,16 @@ public class NFAITCase extends TestLogger { NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set<Set<Event>> resultingPatterns = new HashSet<>(); - List<Collection<Event>> allPatterns = new ArrayList<>(); - - for (StreamRecord<Event> inputEvent : inputEvents) { - Collection<Map<String, Event>> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map<String, Event> foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - allPatterns.add(foundPattern.values()); - } - } + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(6, allPatterns.size()); - assertEquals(Sets.newHashSet( - Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3, end), - Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end), - Sets.newHashSet(startEvent, middleEvent2, middleEvent3, end), - Sets.newHashSet(startEvent, middleEvent2, end), - Sets.newHashSet(startEvent, middleEvent1, end), - Sets.newHashSet(startEvent, end) - ), resultingPatterns); + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, end), + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end), + Lists.newArrayList(startEvent, middleEvent2, middleEvent3, end), + Lists.newArrayList(startEvent, middleEvent2, end), + Lists.newArrayList(startEvent, middleEvent1, end), + Lists.newArrayList(startEvent, end) + )); } @Test @@ -889,31 +777,18 @@ public class NFAITCase extends TestLogger { NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set<Set<Event>> resultingPatterns = new HashSet<>(); - List<Collection<Event>> allPatterns = new ArrayList<>(); - - for (StreamRecord<Event> inputEvent : inputEvents) { - Collection<Map<String, Event>> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map<String, Event> foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - allPatterns.add(foundPattern.values()); - } - } + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(8, allPatterns.size()); - assertEquals(Sets.newHashSet( - Sets.newHashSet(startEvent, middleEvent1, merging, end), - Sets.newHashSet(startEvent, middleEvent1, merging, kleene1, end), - Sets.newHashSet(startEvent, middleEvent1, merging, kleene2, end), - Sets.newHashSet(startEvent, middleEvent1, merging, kleene1, kleene2, end), - Sets.newHashSet(startEvent, middleEvent2, merging, end), - Sets.newHashSet(startEvent, middleEvent2, merging, kleene1, end), - Sets.newHashSet(startEvent, middleEvent2, merging, kleene2, end), - Sets.newHashSet(startEvent, middleEvent2, merging, kleene1, kleene2, end) - ), resultingPatterns); + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(startEvent, middleEvent1, merging, end), + Lists.newArrayList(startEvent, middleEvent1, merging, kleene1, end), + Lists.newArrayList(startEvent, middleEvent1, merging, kleene2, end), + Lists.newArrayList(startEvent, middleEvent1, merging, kleene1, kleene2, end), + Lists.newArrayList(startEvent, middleEvent2, merging, end), + Lists.newArrayList(startEvent, middleEvent2, merging, kleene1, end), + Lists.newArrayList(startEvent, middleEvent2, merging, kleene2, end), + Lists.newArrayList(startEvent, middleEvent2, merging, kleene1, kleene2, end) + )); } @Test @@ -958,19 +833,9 @@ public class NFAITCase extends TestLogger { NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set<Set<Event>> resultingPatterns = new HashSet<>(); - - for (StreamRecord<Event> inputEvent : inputEvents) { - Collection<Map<String, Event>> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map<String, Event> foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - } - } + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(Sets.newHashSet(), resultingPatterns); + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList()); } @Test @@ -1059,26 +924,13 @@ public class NFAITCase extends TestLogger { NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set<Set<Event>> resultingPatterns = new HashSet<>(); - List<Collection<Event>> allPatterns = new ArrayList<>(); - - for (StreamRecord<Event> inputEvent : inputEvents) { - Collection<Map<String, Event>> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map<String, Event> foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - allPatterns.add(foundPattern.values()); - } - } + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(3, allPatterns.size()); - assertEquals(Sets.newHashSet( - Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1), - Sets.newHashSet(startEvent, middleEvent1, end1), - Sets.newHashSet(startEvent, middleEvent2, end1) - ), resultingPatterns); + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1), + Lists.newArrayList(startEvent, middleEvent1, end1), + Lists.newArrayList(startEvent, middleEvent2, end1) + )); } @Test @@ -1113,30 +965,17 @@ public class NFAITCase extends TestLogger { NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set<Set<Event>> resultingPatterns = new HashSet<>(); - List<Collection<Event>> allPatterns = new ArrayList<>(); - - for (StreamRecord<Event> inputEvent : inputEvents) { - Collection<Map<String, Event>> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map<String, Event> foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - allPatterns.add(foundPattern.values()); - } - } + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(7, allPatterns.size()); - assertEquals(Sets.newHashSet( - Sets.newHashSet(startEvent1, startEvent2, startEvent3, end1), - Sets.newHashSet(startEvent1, startEvent2, end1), - Sets.newHashSet(startEvent1, startEvent3, end1), - Sets.newHashSet(startEvent2, startEvent3, end1), - Sets.newHashSet(startEvent1, end1), - Sets.newHashSet(startEvent2, end1), - Sets.newHashSet(startEvent3, end1) - ), resultingPatterns); + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(startEvent1, startEvent2, startEvent3, end1), + Lists.newArrayList(startEvent1, startEvent2, end1), + Lists.newArrayList(startEvent1, startEvent3, end1), + Lists.newArrayList(startEvent2, startEvent3, end1), + Lists.newArrayList(startEvent1, end1), + Lists.newArrayList(startEvent2, end1), + Lists.newArrayList(startEvent3, end1) + )); } @Test @@ -1181,24 +1020,11 @@ public class NFAITCase extends TestLogger { NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set<Set<Event>> resultingPatterns = new HashSet<>(); - List<Collection<Event>> allPatterns = new ArrayList<>(); - - for (StreamRecord<Event> inputEvent : inputEvents) { - Collection<Map<String, Event>> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map<String, Event> foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - allPatterns.add(foundPattern.values()); - } - } + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(1, allPatterns.size()); - assertEquals(Sets.<Set<Event>>newHashSet( - Sets.newHashSet(startEvent, endEvent) - ), resultingPatterns); + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(startEvent, endEvent) + )); } @Test @@ -1291,25 +1117,12 @@ public class NFAITCase extends TestLogger { NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set<Set<Event>> resultingPatterns = new HashSet<>(); - List<Collection<Event>> allPatterns = new ArrayList<>(); - - for (StreamRecord<Event> inputEvent : inputEvents) { - Collection<Map<String, Event>> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map<String, Event> foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - allPatterns.add(foundPattern.values()); - } - } + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(2, allPatterns.size()); - assertEquals(Sets.newHashSet( - Sets.newHashSet(startEvent, middleEvent, end1), - Sets.newHashSet(startEvent, end1) - ), resultingPatterns); + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(startEvent, middleEvent, end1), + Lists.newArrayList(startEvent, end1) + )); } @Test @@ -1602,25 +1415,12 @@ public class NFAITCase extends TestLogger { NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set<Set<Event>> resultingPatterns = new HashSet<>(); - List<Collection<Event>> allPatterns = new ArrayList<>(); - - for (StreamRecord<Event> inputEvent : inputEvents) { - Collection<Map<String, Event>> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map<String, Event> foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - allPatterns.add(foundPattern.values()); - } - } + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(2, allPatterns.size()); - assertEquals(Sets.newHashSet( - Sets.newHashSet(startEvent, end1), - Sets.newHashSet(end1) - ), resultingPatterns); + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(startEvent, end1), + Lists.newArrayList(end1) + )); } @Test @@ -1655,27 +1455,14 @@ public class NFAITCase extends TestLogger { NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set<Set<Event>> resultingPatterns = new HashSet<>(); - List<Collection<Event>> allPatterns = new ArrayList<>(); - - for (StreamRecord<Event> inputEvent : inputEvents) { - Collection<Map<String, Event>> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map<String, Event> foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - allPatterns.add(foundPattern.values()); - } - } + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(4, allPatterns.size()); - assertEquals(Sets.newHashSet( - Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3), - Sets.newHashSet(startEvent, middleEvent1, middleEvent2), - Sets.newHashSet(startEvent, middleEvent1), - Sets.newHashSet(startEvent) - ), resultingPatterns); + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3), + Lists.newArrayList(startEvent, middleEvent1, middleEvent2), + Lists.newArrayList(startEvent, middleEvent1), + Lists.newArrayList(startEvent) + )); } @Test @@ -1749,25 +1536,12 @@ public class NFAITCase extends TestLogger { NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set<Set<Event>> resultingPatterns = new HashSet<>(); - List<Collection<Event>> allPatterns = new ArrayList<>(); - - for (StreamRecord<Event> inputEvent : inputEvents) { - Collection<Map<String, Event>> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map<String, Event> foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - allPatterns.add(foundPattern.values()); - } - } + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(2, allPatterns.size()); - assertEquals(Sets.newHashSet( - Sets.newHashSet(startEvent, middleEvent1), - Sets.newHashSet(startEvent) - ), resultingPatterns); + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(startEvent, middleEvent1), + Lists.newArrayList(startEvent) + )); } @Test @@ -1972,7 +1746,7 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).times(2).consecutive().optional().followedBy("end1").where(new SimpleCondition<Event>() { // TODO: 4/4/17 also check order consecutive() vs optional() + }).times(2).consecutive().optional().followedBy("end1").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -2063,7 +1837,7 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).times(2).consecutive().optional().followedBy("end1").where(new SimpleCondition<Event>() { // TODO: 4/4/17 also check order consecutive() vs optional() + }).times(2).consecutive().optional().followedBy("end1").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -2108,7 +1882,7 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).times(2).allowCombinations().optional().followedBy("end1").where(new SimpleCondition<Event>() { // TODO: 4/4/17 also check order consecutive() vs optional() + }).times(2).allowCombinations().optional().followedBy("end1").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -3165,26 +2939,28 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(d, 5)); Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = 5167288560432018992L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } }).notNext("notPattern").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 2242479288129905510L; + @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b"); } }).followedByAny("middle").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = 1404509325548220892L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } }).followedBy("end").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = -8907427230007830915L; @Override public boolean filter(Event value) throws Exception { @@ -3219,26 +2995,28 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(d, 5)); Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = -339500190577666439L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } }).notNext("notPattern").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = -6913980632538046451L; + @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b"); } }).followedBy("middle").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = 3332196998905139891L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } }).followedBy("end").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = 2086563479959018387L; @Override public boolean filter(Event value) throws Exception { @@ -3270,27 +3048,29 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(b1, 5)); Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = 1672995058886176627L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } }).followedByAny("middle").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = 6003621617520261554L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } }).followedByAny("end").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = 887700237024758417L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("d"); } }).notNext("notPattern").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5239529076086933032L; + @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b"); @@ -3321,26 +3101,28 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(d, 5)); Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = -2641662468313191976L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } }).notFollowedBy("notPattern").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = -3632144132379494778L; + @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b"); } }).followedByAny("middle").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = 3818766882138348167L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } }).followedBy("end").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = 2033204730795451288L; @Override public boolean filter(Event value) throws Exception { @@ -3374,26 +3156,28 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(d, 5)); Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = -2454396370205097543L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } }).notFollowedBy("notPattern").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 2749547391611263290L; + @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b"); } }).followedByAny("middle").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = -4989511337298217255L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } }).optional().followedBy("end").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = -8466223836652936608L; @Override public boolean filter(Event value) throws Exception { @@ -3427,26 +3211,28 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(d, 5)); Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = -2568839911852184515L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } }).followedByAny("middle").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = -3632232424064269636L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b"); } }).times(2).notFollowedBy("notPattern").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 3685596793523534611L; + @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } }).followedBy("end").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = 1960758663575587243L; @Override public boolean filter(Event value) throws Exception { @@ -3482,26 +3268,28 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(d2, 5)); Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = 2814850350025111940L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } }).notFollowedBy("notPattern").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 4988756153568853834L; + @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b"); } }).followedByAny("middle").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = -225909103322018778L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } }).times(2).optional().followedBy("end").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = -924294627956373696L; @Override public boolean filter(Event value) throws Exception { @@ -3539,26 +3327,28 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(d2, 5)); Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = 6193105689601702341L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } }).followedByAny("middle").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = 5195859580923169111L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b"); } }).times(2).notFollowedBy("notPattern").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 4973027956103783831L; + @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } }).followedBy("end").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = 2724622546678984894L; @Override public boolean filter(Event value) throws Exception { @@ -3588,19 +3378,21 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(c2, 4)); Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = -4289351792573443294L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } }).notFollowedBy("notPattern").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = -4989574608417523507L; + @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b"); } }).followedByAny("end").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = -5940131818629290579L; @Override public boolean filter(Event value) throws Exception { @@ -3635,26 +3427,28 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(d, 5)); Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = -7885381452276160322L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } }).notFollowedBy("notPattern").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 3471511260235826653L; + @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b"); } }).followedByAny("middle").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = 9073793782452363833L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } }).times(2).optional().followedBy("end").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = 7972902718259767076L; @Override public boolean filter(Event value) throws Exception { @@ -3690,26 +3484,28 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(d, 6)); Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = -7866220136345465444L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } }).notFollowedBy("notPattern").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 4957837489028234932L; + @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b"); } }).followedBy("middle").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = 5569569968862808007L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } }).followedBy("end").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + private static final long serialVersionUID = -8579678167937416269L; @Override public boolean filter(Event value) throws Exception { @@ -4125,12 +3921,16 @@ public class NFAITCase extends TestLogger { List<List<Event>> resultingPatterns = new ArrayList<>(); for (StreamRecord<Event> inputEvent : inputEvents) { - Collection<Map<String, Event>> patterns = nfa.process( + Collection<Map<String, List<Event>>> patterns = nfa.process( inputEvent.getValue(), inputEvent.getTimestamp()).f0; - for (Map<String, Event> p: patterns) { - resultingPatterns.add(new ArrayList<>(p.values())); + for (Map<String, List<Event>> p: patterns) { + List<Event> res = new ArrayList<>(); + for (List<Event> le: p.values()) { + res.addAll(le); + } + resultingPatterns.add(res); } } return resultingPatterns; http://git-wip-us.apache.org/repos/asf/flink/blob/ae9c9d06/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java index d2e392b..11d193a 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java @@ -82,20 +82,20 @@ public class NFATest extends TestLogger { nfa.addState(endState); nfa.addState(endingState); - Set<Map<String, Event>> expectedPatterns = new HashSet<>(); + Set<Map<String, List<Event>>> expectedPatterns = new HashSet<>(); - Map<String, Event> firstPattern = new HashMap<>(); - firstPattern.put("start", new Event(1, "start", 1.0)); - firstPattern.put("end", new Event(4, "end", 4.0)); + Map<String, List<Event>> firstPattern = new HashMap<>(); + firstPattern.put("start", Collections.singletonList(new Event(1, "start", 1.0))); + firstPattern.put("end", Collections.singletonList(new Event(4, "end", 4.0))); - Map<String, Event> secondPattern = new HashMap<>(); - secondPattern.put("start", new Event(3, "start", 3.0)); - secondPattern.put("end", new Event(4, "end", 4.0)); + Map<String, List<Event>> secondPattern = new HashMap<>(); + secondPattern.put("start", Collections.singletonList(new Event(3, "start", 3.0))); + secondPattern.put("end", Collections.singletonList(new Event(4, "end", 4.0))); expectedPatterns.add(firstPattern); expectedPatterns.add(secondPattern); - Collection<Map<String, Event>> actualPatterns = runNFA(nfa, streamEvents); + Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, streamEvents); assertEquals(expectedPatterns, actualPatterns); } @@ -110,15 +110,15 @@ public class NFATest extends TestLogger { streamEvents.add(new StreamRecord<>(new Event(3, "start", 3.0), 3L)); streamEvents.add(new StreamRecord<>(new Event(4, "end", 4.0), 4L)); - Set<Map<String, Event>> expectedPatterns = new HashSet<>(); + Set<Map<String, List<Event>>> expectedPatterns = new HashSet<>(); - Map<String, Event> secondPattern = new HashMap<>(); - secondPattern.put("start", new Event(3, "start", 3.0)); - secondPattern.put("end", new Event(4, "end", 4.0)); + Map<String, List<Event>> secondPattern = new HashMap<>(); + secondPattern.put("start", Collections.singletonList(new Event(3, "start", 3.0))); + secondPattern.put("end", Collections.singletonList(new Event(4, "end", 4.0))); expectedPatterns.add(secondPattern); - Collection<Map<String, Event>> actualPatterns = runNFA(nfa, streamEvents); + Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, streamEvents); assertEquals(expectedPatterns, actualPatterns); } @@ -135,9 +135,9 @@ public class NFATest extends TestLogger { streamEvents.add(new StreamRecord<>(new Event(1, "start", 1.0), 1L)); streamEvents.add(new StreamRecord<>(new Event(2, "end", 2.0), 3L)); - Set<Map<String, Event>> expectedPatterns = Collections.emptySet(); + Set<Map<String, List<Event>>> expectedPatterns = Collections.emptySet(); - Collection<Map<String, Event>> actualPatterns = runNFA(nfa, streamEvents); + Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, streamEvents); assertEquals(expectedPatterns, actualPatterns); } @@ -156,40 +156,24 @@ public class NFATest extends TestLogger { streamEvents.add(new StreamRecord<>(new Event(3, "foobar", 3.0), 3L)); streamEvents.add(new StreamRecord<>(new Event(4, "end", 4.0), 3L)); - Set<Map<String, Event>> expectedPatterns = new HashSet<>(); + Set<Map<String, List<Event>>> expectedPatterns = new HashSet<>(); - Map<String, Event> secondPattern = new HashMap<>(); - secondPattern.put("start", new Event(2, "start", 2.0)); - secondPattern.put("end", new Event(4, "end", 4.0)); + Map<String, List<Event>> secondPattern = new HashMap<>(); + secondPattern.put("start", Collections.singletonList(new Event(2, "start", 2.0))); + secondPattern.put("end", Collections.singletonList(new Event(4, "end", 4.0))); expectedPatterns.add(secondPattern); - Collection<Map<String, Event>> actualPatterns = runNFA(nfa, streamEvents); + Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, streamEvents); assertEquals(expectedPatterns, actualPatterns); } - @Test - public void testStateNameGeneration() { - String expectedName1 = "a[2]"; - String expectedName2 = "a_3"; - String expectedName3 = "a[][42]"; - - String generatedName1 = NFA.generateStateName("a[]", 2); - String generatedName2 = NFA.generateStateName("a", 3); - String generatedName3 = NFA.generateStateName("a[][]", 42); - - - assertEquals(expectedName1, generatedName1); - assertEquals(expectedName2, generatedName2); - assertEquals(expectedName3, generatedName3); - } - - public <T> Collection<Map<String, T>> runNFA(NFA<T> nfa, List<StreamRecord<T>> inputs) { - Set<Map<String, T>> actualPatterns = new HashSet<>(); + public <T> Collection<Map<String, List<T>>> runNFA(NFA<T> nfa, List<StreamRecord<T>> inputs) { + Set<Map<String, List<T>>> actualPatterns = new HashSet<>(); for (StreamRecord<T> streamEvent : inputs) { - Collection<Map<String, T>> matchedPatterns = nfa.process( + Collection<Map<String, List<T>>> matchedPatterns = nfa.process( streamEvent.getValue(), streamEvent.getTimestamp()).f0; http://git-wip-us.apache.org/repos/asf/flink/blob/ae9c9d06/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java index adc07b3..2da3c31 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java @@ -18,7 +18,8 @@ package org.apache.flink.cep.nfa; -import com.google.common.collect.LinkedHashMultimap; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; import org.apache.flink.cep.Event; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -48,12 +49,12 @@ public class SharedBufferTest extends TestLogger { events[i] = new Event(i + 1, "e" + (i + 1), i); } - LinkedHashMultimap<String, Event> expectedPattern1 = LinkedHashMultimap.create(); + ListMultimap<String, Event> expectedPattern1 = ArrayListMultimap.create(); expectedPattern1.put("a1", events[2]); expectedPattern1.put("a[]", events[3]); expectedPattern1.put("b", events[5]); - LinkedHashMultimap<String, Event> expectedPattern2 = LinkedHashMultimap.create(); + ListMultimap<String, Event> expectedPattern2 = ArrayListMultimap.create(); expectedPattern2.put("a1", events[0]); expectedPattern2.put("a[]", events[1]); expectedPattern2.put("a[]", events[2]); @@ -61,7 +62,7 @@ public class SharedBufferTest extends TestLogger { expectedPattern2.put("a[]", events[4]); expectedPattern2.put("b", events[5]); - LinkedHashMultimap<String, Event> expectedPattern3 = LinkedHashMultimap.create(); + ListMultimap<String, Event> expectedPattern3 = ArrayListMultimap.create(); expectedPattern3.put("a1", events[0]); expectedPattern3.put("a[]", events[1]); expectedPattern3.put("a[]", events[2]); @@ -84,11 +85,11 @@ public class SharedBufferTest extends TestLogger { sharedBuffer.put("a[]", events[6], timestamp, "a[]", events[5], timestamp, DeweyNumber.fromString("1.1")); sharedBuffer.put("b", events[7], timestamp, "a[]", events[6], timestamp, DeweyNumber.fromString("1.1.0")); - Collection<LinkedHashMultimap<String, Event>> patterns3 = sharedBuffer.extractPatterns("b", events[7], timestamp, DeweyNumber.fromString("1.1.0")); + Collection<ListMultimap<String, Event>> patterns3 = sharedBuffer.extractPatterns("b", events[7], timestamp, DeweyNumber.fromString("1.1.0")); sharedBuffer.release("b", events[7], timestamp); - Collection<LinkedHashMultimap<String, Event>> patterns4 = sharedBuffer.extractPatterns("b", events[7], timestamp, DeweyNumber.fromString("1.1.0")); - Collection<LinkedHashMultimap<String, Event>> patterns1 = sharedBuffer.extractPatterns("b", events[5], timestamp, DeweyNumber.fromString("2.0.0")); - Collection<LinkedHashMultimap<String, Event>> patterns2 = sharedBuffer.extractPatterns("b", events[5], timestamp, DeweyNumber.fromString("1.0.0")); + Collection<ListMultimap<String, Event>> patterns4 = sharedBuffer.extractPatterns("b", events[7], timestamp, DeweyNumber.fromString("1.1.0")); + Collection<ListMultimap<String, Event>> patterns1 = sharedBuffer.extractPatterns("b", events[5], timestamp, DeweyNumber.fromString("2.0.0")); + Collection<ListMultimap<String, Event>> patterns2 = sharedBuffer.extractPatterns("b", events[5], timestamp, DeweyNumber.fromString("1.0.0")); sharedBuffer.release("b", events[5], timestamp); assertEquals(1L, patterns3.size()); http://git-wip-us.apache.org/repos/asf/flink/blob/ae9c9d06/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java index 90a6321..26b8ce9 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java @@ -18,7 +18,6 @@ package org.apache.flink.cep.nfa.compiler; -import com.google.common.collect.Sets; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; http://git-wip-us.apache.org/repos/asf/flink/blob/ae9c9d06/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java index 2f7cdeb..afb3e7c 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java @@ -36,6 +36,7 @@ import org.apache.flink.streaming.util.OperatorSnapshotUtil; import org.junit.Ignore; import org.junit.Test; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentLinkedQueue; @@ -72,7 +73,7 @@ public class CEPFrom12MigrationTest { final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0); final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10.0); - OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = new KeyedOneInputStreamOperatorTestHarness<>( new KeyedCEPPatternOperator<>( Event.createTypeSerializer(), @@ -120,7 +121,7 @@ public class CEPFrom12MigrationTest { final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10.0); final Event endEvent = new Event(42, "end", 1.0); - OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = new KeyedOneInputStreamOperatorTestHarness<>( new KeyedCEPPatternOperator<>( Event.createTypeSerializer(), @@ -160,18 +161,18 @@ public class CEPFrom12MigrationTest { assertTrue(resultRecord2.getValue() instanceof Map); @SuppressWarnings("unchecked") - Map<String, Event> patternMap1 = (Map<String, Event>) resultRecord1.getValue(); + Map<String, List<Event>> patternMap1 = (Map<String, List<Event>>) resultRecord1.getValue(); - assertEquals(startEvent, patternMap1.get("start")); - assertEquals(middleEvent1, patternMap1.get("middle")); - assertEquals(endEvent, patternMap1.get("end")); + assertEquals(startEvent, patternMap1.get("start").get(0)); + assertEquals(middleEvent1, patternMap1.get("middle").get(0)); + assertEquals(endEvent, patternMap1.get("end").get(0)); @SuppressWarnings("unchecked") - Map<String, Event> patternMap2 = (Map<String, Event>) resultRecord2.getValue(); + Map<String, List<Event>> patternMap2 = (Map<String, List<Event>>) resultRecord2.getValue(); - assertEquals(startEvent, patternMap2.get("start")); - assertEquals(middleEvent2, patternMap2.get("middle")); - assertEquals(endEvent, patternMap2.get("end")); + assertEquals(startEvent, patternMap2.get("start").get(0)); + assertEquals(middleEvent2, patternMap2.get("middle").get(0)); + assertEquals(endEvent, patternMap2.get("end").get(0)); harness.close(); } @@ -195,7 +196,7 @@ public class CEPFrom12MigrationTest { final Event startEvent1 = new Event(42, "start", 1.0); final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0); - OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = new KeyedOneInputStreamOperatorTestHarness<>( new KeyedCEPPatternOperator<>( Event.createTypeSerializer(), @@ -241,7 +242,7 @@ public class CEPFrom12MigrationTest { final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10.0); final Event endEvent = new Event(42, "end", 1.0); - OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = new KeyedOneInputStreamOperatorTestHarness<>( new KeyedCEPPatternOperator<>( Event.createTypeSerializer(), @@ -287,25 +288,25 @@ public class CEPFrom12MigrationTest { assertTrue(resultRecord3.getValue() instanceof Map); @SuppressWarnings("unchecked") - Map<String, Event> patternMap1 = (Map<String, Event>) resultRecord1.getValue(); + Map<String, List<Event>> patternMap1 = (Map<String, List<Event>>) resultRecord1.getValue(); - assertEquals(startEvent1, patternMap1.get("start")); - assertEquals(middleEvent1, patternMap1.get("middle")); - assertEquals(endEvent, patternMap1.get("end")); + assertEquals(startEvent1, patternMap1.get("start").get(0)); + assertEquals(middleEvent1, patternMap1.get("middle").get(0)); + assertEquals(endEvent, patternMap1.get("end").get(0)); @SuppressWarnings("unchecked") - Map<String, Event> patternMap2 = (Map<String, Event>) resultRecord2.getValue(); + Map<String, List<Event>> patternMap2 = (Map<String, List<Event>>) resultRecord2.getValue(); - assertEquals(startEvent1, patternMap2.get("start")); - assertEquals(middleEvent2, patternMap2.get("middle")); - assertEquals(endEvent, patternMap2.get("end")); + assertEquals(startEvent1, patternMap2.get("start").get(0)); + assertEquals(middleEvent2, patternMap2.get("middle").get(0)); + assertEquals(endEvent, patternMap2.get("end").get(0)); @SuppressWarnings("unchecked") - Map<String, Event> patternMap3 = (Map<String, Event>) resultRecord3.getValue(); + Map<String, List<Event>> patternMap3 = (Map<String, List<Event>>) resultRecord3.getValue(); - assertEquals(startEvent2, patternMap3.get("start")); - assertEquals(middleEvent2, patternMap3.get("middle")); - assertEquals(endEvent, patternMap3.get("end")); + assertEquals(startEvent2, patternMap3.get("start").get(0)); + assertEquals(middleEvent2, patternMap3.get("middle").get(0)); + assertEquals(endEvent, patternMap3.get("end").get(0)); harness.close(); } @@ -328,7 +329,7 @@ public class CEPFrom12MigrationTest { final Event startEvent1 = new Event(42, "start", 1.0); - OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = new KeyedOneInputStreamOperatorTestHarness<>( new KeyedCEPPatternOperator<>( Event.createTypeSerializer(), @@ -367,7 +368,7 @@ public class CEPFrom12MigrationTest { final Event startEvent1 = new Event(42, "start", 1.0); - OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = new KeyedOneInputStreamOperatorTestHarness<>( new KeyedCEPPatternOperator<>( Event.createTypeSerializer(), @@ -401,9 +402,9 @@ public class CEPFrom12MigrationTest { assertTrue(resultRecord.getValue() instanceof Map); @SuppressWarnings("unchecked") - Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue(); + Map<String, List<Event>> patternMap = (Map<String, List<Event>>) resultRecord.getValue(); - assertEquals(startEvent1, patternMap.get("start")); + assertEquals(startEvent1, patternMap.get("start").get(0)); harness.close(); } http://git-wip-us.apache.org/repos/asf/flink/blob/ae9c9d06/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java index 4e05fcf..404de54 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java @@ -37,6 +37,7 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.junit.Test; import java.net.URL; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentLinkedQueue; @@ -95,7 +96,7 @@ public class CEPMigration11to13Test { harness.close(); */ - OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = new KeyedOneInputStreamOperatorTestHarness<>( new KeyedCEPPatternOperator<>( Event.createTypeSerializer(), @@ -129,11 +130,11 @@ public class CEPMigration11to13Test { assertTrue(resultRecord.getValue() instanceof Map); @SuppressWarnings("unchecked") - Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue(); + Map<String, List<Event>> patternMap = (Map<String, List<Event>>) resultRecord.getValue(); - assertEquals(startEvent, patternMap.get("start")); - assertEquals(middleEvent, patternMap.get("middle")); - assertEquals(endEvent, patternMap.get("end")); + assertEquals(startEvent, patternMap.get("start").get(0)); + assertEquals(middleEvent, patternMap.get("middle").get(0)); + assertEquals(endEvent, patternMap.get("end").get(0)); harness.close(); } @@ -170,7 +171,7 @@ public class CEPMigration11to13Test { NullByteKeySelector keySelector = new NullByteKeySelector(); - OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = new KeyedOneInputStreamOperatorTestHarness<>( new KeyedCEPPatternOperator<>( Event.createTypeSerializer(), @@ -204,11 +205,11 @@ public class CEPMigration11to13Test { assertTrue(resultRecord.getValue() instanceof Map); @SuppressWarnings("unchecked") - Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue(); + Map<String, List<Event>> patternMap = (Map<String, List<Event>>) resultRecord.getValue(); - assertEquals(startEvent, patternMap.get("start")); - assertEquals(middleEvent, patternMap.get("middle")); - assertEquals(endEvent, patternMap.get("end")); + assertEquals(startEvent, patternMap.get("start").get(0)); + assertEquals(middleEvent, patternMap.get("middle").get(0)); + assertEquals(endEvent, patternMap.get("end").get(0)); harness.close(); } http://git-wip-us.apache.org/repos/asf/flink/blob/ae9c9d06/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java index 4048bc2..5ed8b46 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java @@ -46,7 +46,9 @@ import org.junit.rules.TemporaryFolder; import static org.junit.Assert.*; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Queue; @@ -58,7 +60,7 @@ public class CEPOperatorTest extends TestLogger { @Test public void testKeyedCEPOperatorWatermarkForwarding() throws Exception { - OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = getCepTestHarness(false); + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(false); harness.open(); @@ -74,7 +76,7 @@ public class CEPOperatorTest extends TestLogger { @Test public void testKeyedCEPOperatorCheckpointing() throws Exception { - OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = getCepTestHarness(false); + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(false); harness.open(); @@ -138,7 +140,7 @@ public class CEPOperatorTest extends TestLogger { RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend()); rocksDBStateBackend.setDbStoragePath(rocksDbPath); - OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = getCepTestHarness(false); + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(false); harness.setStateBackend(rocksDBStateBackend); @@ -208,7 +210,6 @@ public class CEPOperatorTest extends TestLogger { * Tests that the internal time of a CEP operator advances only given watermarks. See FLINK-5033 */ @Test - @SuppressWarnings("unchecked") public void testKeyedAdvancingTimeWithoutElements() throws Exception { final KeySelector<Event, Integer> keySelector = new TestKeySelector(); @@ -216,10 +217,10 @@ public class CEPOperatorTest extends TestLogger { final long watermarkTimestamp1 = 5L; final long watermarkTimestamp2 = 13L; - final Map<String, Event> expectedSequence = new HashMap<>(2); - expectedSequence.put("start", startEvent); + final Map<String, List<Event>> expectedSequence = new HashMap<>(2); + expectedSequence.put("start", Collections.<Event>singletonList(startEvent)); - OneInputStreamOperatorTestHarness<Event, Either<Tuple2<Map<String, Event>, Long>, Map<String, Event>>> harness = new KeyedOneInputStreamOperatorTestHarness<>( + OneInputStreamOperatorTestHarness<Event, Either<Tuple2<Map<String, List<Event>>, Long>, Map<String, List<Event>>>> harness = new KeyedOneInputStreamOperatorTestHarness<>( new TimeoutKeyedCEPPatternOperator<>( Event.createTypeSerializer(), false, @@ -234,7 +235,7 @@ public class CEPOperatorTest extends TestLogger { try { harness.setup( new KryoSerializer<>( - (Class<Either<Tuple2<Map<String, Event>, Long>, Map<String, Event>>>) (Object) Either.class, + (Class<Either<Tuple2<Map<String, List<Event>>, Long>, Map<String, List<Event>>>>) (Object) Either.class, new ExecutionConfig())); harness.open(); @@ -256,13 +257,15 @@ public class CEPOperatorTest extends TestLogger { assertTrue(resultObject instanceof StreamRecord); - StreamRecord<Either<Tuple2<Map<String, Event>, Long>, Map<String, Event>>> streamRecord = (StreamRecord<Either<Tuple2<Map<String,Event>,Long>,Map<String,Event>>>) resultObject; + StreamRecord<Either<Tuple2<Map<String, List<Event>>, Long>, Map<String, List<Event>>>> streamRecord = + (StreamRecord<Either<Tuple2<Map<String,List<Event>>,Long>,Map<String,List<Event>>>>) resultObject; assertTrue(streamRecord.getValue() instanceof Either.Left); - Either.Left<Tuple2<Map<String, Event>, Long>, Map<String, Event>> left = (Either.Left<Tuple2<Map<String, Event>, Long>, Map<String, Event>>) streamRecord.getValue(); + Either.Left<Tuple2<Map<String, List<Event>>, Long>, Map<String, List<Event>>> left = + (Either.Left<Tuple2<Map<String, List<Event>>, Long>, Map<String, List<Event>>>) streamRecord.getValue(); - Tuple2<Map<String, Event>, Long> leftResult = left.left(); + Tuple2<Map<String, List<Event>>, Long> leftResult = left.left(); assertEquals(watermarkTimestamp2, (long) leftResult.f1); assertEquals(expectedSequence, leftResult.f0); @@ -292,7 +295,7 @@ public class CEPOperatorTest extends TestLogger { TestKeySelector keySelector = new TestKeySelector(); KeyedCEPPatternOperator<Event, Integer> operator = getKeyedCepOpearator(false, keySelector); - OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = getCepTestHarness(operator); + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(operator); harness.open(); @@ -380,7 +383,7 @@ public class CEPOperatorTest extends TestLogger { TestKeySelector keySelector = new TestKeySelector(); KeyedCEPPatternOperator<Event, Integer> operator = getKeyedCepOpearator(true, keySelector); - OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = getCepTestHarness(operator); + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(operator); harness.open(); @@ -449,13 +452,13 @@ public class CEPOperatorTest extends TestLogger { assertTrue(resultRecord.getValue() instanceof Map); @SuppressWarnings("unchecked") - Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue(); - assertEquals(start, patternMap.get("start")); - assertEquals(middle, patternMap.get("middle")); - assertEquals(end, patternMap.get("end")); + Map<String, List<Event>> patternMap = (Map<String, List<Event>>) resultRecord.getValue(); + assertEquals(start, patternMap.get("start").get(0)); + assertEquals(middle, patternMap.get("middle").get(0)); + assertEquals(end, patternMap.get("end").get(0)); } - private OneInputStreamOperatorTestHarness<Event, Map<String, Event>> getCepTestHarness(boolean isProcessingTime) throws Exception { + private OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> getCepTestHarness(boolean isProcessingTime) throws Exception { KeySelector<Event, Integer> keySelector = new TestKeySelector(); return new KeyedOneInputStreamOperatorTestHarness<>( @@ -464,7 +467,7 @@ public class CEPOperatorTest extends TestLogger { BasicTypeInfo.INT_TYPE_INFO); } - private OneInputStreamOperatorTestHarness<Event, Map<String, Event>> getCepTestHarness( + private OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> getCepTestHarness( KeyedCEPPatternOperator<Event, Integer> cepOperator) throws Exception { KeySelector<Event, Integer> keySelector = new TestKeySelector(); http://git-wip-us.apache.org/repos/asf/flink/blob/ae9c9d06/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java index a048183..0210ef9 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java @@ -37,6 +37,7 @@ import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.junit.Test; +import java.util.List; import java.util.Map; import java.util.Queue; @@ -79,7 +80,7 @@ public class CEPRescalingTest { // now we start the test, we go from parallelism 1 to 2. - OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getTestHarness(maxParallelism, 1, 0); harness.open(); @@ -99,7 +100,7 @@ public class CEPRescalingTest { // so we initialize the two tasks and we put the rest of // the valid elements for the pattern on task 0. - OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness1 = + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness1 = getTestHarness(maxParallelism, 2, 0); harness1.setup(); @@ -120,7 +121,7 @@ public class CEPRescalingTest { verifyWatermark(harness1.getOutput().poll(), 2); verifyPattern(harness1.getOutput().poll(), startEvent1, middleEvent1, endEvent1); - OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness2 = + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness2 = getTestHarness(maxParallelism, 2, 1); harness2.setup(); @@ -198,15 +199,15 @@ public class CEPRescalingTest { // starting the test, we will go from parallelism of 3 to parallelism of 2 - OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness1 = + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness1 = getTestHarness(maxParallelism, 3, 0); harness1.open(); - OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness2 = + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness2 = getTestHarness(maxParallelism, 3, 1); harness2.open(); - OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness3 = + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness3 = getTestHarness(maxParallelism, 3, 2); harness3.open(); @@ -251,13 +252,13 @@ public class CEPRescalingTest { harness3.snapshot(0, 0) ); - OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness4 = + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness4 = getTestHarness(maxParallelism, 2, 0); harness4.setup(); harness4.initializeState(snapshot); harness4.open(); - OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness5 = + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness5 = getTestHarness(maxParallelism, 2, 1); harness5.setup(); harness5.initializeState(snapshot); @@ -295,8 +296,8 @@ public class CEPRescalingTest { assertTrue(resultRecord.getValue() instanceof Map); @SuppressWarnings("unchecked") - Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue(); - if (patternMap.get("start").getId() == 7) { + Map<String, List<Event>> patternMap = (Map<String, List<Event>>) resultRecord.getValue(); + if (patternMap.get("start").get(0).getId() == 7) { verifyPattern(harness4.getOutput().poll(), startEvent1, middleEvent1, endEvent1); verifyPattern(harness4.getOutput().poll(), startEvent3, middleEvent3, endEvent3); } else { @@ -327,13 +328,13 @@ public class CEPRescalingTest { assertTrue(resultRecord.getValue() instanceof Map); @SuppressWarnings("unchecked") - Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue(); - assertEquals(start, patternMap.get("start")); - assertEquals(middle, patternMap.get("middle")); - assertEquals(end, patternMap.get("end")); + Map<String, List<Event>> patternMap = (Map<String, List<Event>>) resultRecord.getValue(); + assertEquals(start, patternMap.get("start").get(0)); + assertEquals(middle, patternMap.get("middle").get(0)); + assertEquals(end, patternMap.get("end").get(0)); } - private KeyedOneInputStreamOperatorTestHarness<Integer, Event, Map<String, Event>> getTestHarness( + private KeyedOneInputStreamOperatorTestHarness<Integer, Event, Map<String, List<Event>>> getTestHarness( int maxParallelism, int taskParallelism, int subtaskIdx) throws Exception {