Repository: flink Updated Branches: refs/heads/master 1a062b796 -> a92d78746
[FLINK-5871] [cep] Enforce uniqueness of pattern names in CEP. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a92d7874 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a92d7874 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a92d7874 Branch: refs/heads/master Commit: a92d78746318e9bbe949d9d030fc21b5348b56e2 Parents: 1a062b7 Author: kl0u <[email protected]> Authored: Tue Feb 21 13:51:11 2017 +0100 Committer: kl0u <[email protected]> Committed: Tue Feb 28 09:35:49 2017 +0100 ---------------------------------------------------------------------- .../nfa/compiler/MalformedPatternException.java | 32 ++++++++++++++++++ .../flink/cep/nfa/compiler/NFACompiler.java | 10 ++++++ .../org/apache/flink/cep/pattern/Pattern.java | 2 +- .../flink/cep/nfa/compiler/NFACompilerTest.java | 34 ++++++++++++++++++++ 4 files changed, 77 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a92d7874/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/MalformedPatternException.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/MalformedPatternException.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/MalformedPatternException.java new file mode 100644 index 0000000..a3bb5f4 --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/MalformedPatternException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa.compiler; + +/** + * An exception used to indicate that a {@link org.apache.flink.cep.pattern.Pattern} + * was not specified correctly. + */ +public class MalformedPatternException extends RuntimeException { + + private static final long serialVersionUID = 7751134834983361543L; + + public MalformedPatternException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a92d7874/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java index 878e0b2..18ed21f 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java @@ -34,6 +34,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Set; /** * Compiler class containing methods to compile a {@link Pattern} into a {@link NFA} or a @@ -84,12 +85,16 @@ public class NFACompiler { Map<String, State<T>> states = new HashMap<>(); long windowTime; + // this is used to enforse pattern name uniqueness. + Set<String> patternNames = new HashSet<>(); + Pattern<T, ?> succeedingPattern; State<T> succeedingState; Pattern<T, ?> currentPattern = pattern; // we're traversing the pattern from the end to the beginning --> the first state is the final state State<T> currentState = new State<>(currentPattern.getName(), State.StateType.Final); + patternNames.add(currentPattern.getName()); states.put(currentPattern.getName(), currentState); @@ -100,6 +105,11 @@ public class NFACompiler { succeedingState = currentState; currentPattern = currentPattern.getPrevious(); + if (!patternNames.add(currentPattern.getName())) { + throw new MalformedPatternException("Duplicate pattern name: " + currentPattern.getName() + ". " + + "Pattern names must be unique."); + } + Time currentWindowTime = currentPattern.getWindowTime(); if (currentWindowTime != null && currentWindowTime.toMilliseconds() < windowTime) { http://git-wip-us.apache.org/repos/asf/flink/blob/a92d7874/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java index 9269dcb..7ea675f 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java @@ -93,7 +93,7 @@ public class Pattern<T, F extends T> { } /** - * Specifies a filter condition which is ORed with an existing filter function. + * Specifies a filter condition which is OR'ed with an existing filter function. * * @param orFilterFunction OR filter condition * @return The same pattern operator where the new filter condition is set http://git-wip-us.apache.org/repos/asf/flink/blob/a92d7874/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java index c790c35..d11f3a8 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java @@ -30,7 +30,9 @@ import org.apache.flink.cep.nfa.StateTransition; import org.apache.flink.cep.nfa.StateTransitionAction; import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.util.TestLogger; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.util.Collection; import java.util.HashMap; @@ -42,6 +44,38 @@ import static org.junit.Assert.assertEquals; public class NFACompilerTest extends TestLogger { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testNFACompilerUniquePatternName() { + + // adjust the rule + expectedException.expect(MalformedPatternException.class); + expectedException.expectMessage("Duplicate pattern name: start. Pattern names must be unique."); + + Pattern<Event, ?> invalidPattern = Pattern.<Event>begin("start").where(new TestFilter()) + .followedBy("middle").where(new TestFilter()) + .followedBy("start").where(new TestFilter()); + + // here we must have an exception because of the two "start" patterns with the same name. + NFACompiler.compile(invalidPattern, Event.createTypeSerializer(), false); + } + + /** + * A filter implementation to test invalid pattern specification with + * duplicate pattern names. Check {@link #testNFACompilerUniquePatternName()}. + */ + private static class TestFilter implements FilterFunction<Event> { + + private static final long serialVersionUID = -3863103355752267133L; + + @Override + public boolean filter(Event value) throws Exception { + throw new RuntimeException("It should never arrive here."); + } + } + /** * Tests that the NFACompiler generates the correct NFA from a given Pattern */
