Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3477#discussion_r106142265
  
    --- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
    @@ -74,88 +76,233 @@
         */
        @SuppressWarnings("unchecked")
        public static <T> NFAFactory<T> compileFactory(
    -           Pattern<T, ?> pattern,
    -           TypeSerializer<T> inputTypeSerializer,
    +           final Pattern<T, ?> pattern,
    +           final TypeSerializer<T> inputTypeSerializer,
                boolean timeoutHandling) {
                if (pattern == null) {
                        // return a factory for empty NFAs
    -                   return new NFAFactoryImpl<T>(inputTypeSerializer, 0, 
Collections.<State<T>>emptyList(), timeoutHandling);
    +                   return new NFAFactoryImpl<>(inputTypeSerializer, 0, 
Collections.<State<T>>emptyList(), timeoutHandling);
                } else {
    -                   // set of all generated states
    -                   Map<String, State<T>> states = new HashMap<>();
    -                   long windowTime;
    +                   final NFAFactoryCompiler<T> nfaFactoryCompiler = new 
NFAFactoryCompiler<>(pattern);
    +                   nfaFactoryCompiler.compileFactory();
    +                   return new NFAFactoryImpl<>(inputTypeSerializer, 
nfaFactoryCompiler.getWindowTime(), nfaFactoryCompiler.getStates(), 
timeoutHandling);
    +           }
    +   }
     
    -                   // this is used to enforse pattern name uniqueness.
    -                   Set<String> patternNames = new HashSet<>();
    +   private static class NFAFactoryCompiler<T> {
     
    -                   Pattern<T, ?> succeedingPattern;
    -                   State<T> succeedingState;
    -                   Pattern<T, ?> currentPattern = pattern;
    +           private final Set<String> usedNames = new HashSet<>();
    +           private final List<State<T>> states = new ArrayList<>();
     
    +           private long windowTime = 0;
    +           private Pattern<T, ?> currentPattern;
    +
    +           NFAFactoryCompiler(final Pattern<T, ?> pattern) {
    +                   this.currentPattern = pattern;
    +           }
    +
    +           /**
    +            * Compiles the given pattern into a {@link NFAFactory}. The 
NFA factory can be used to create
    +            * multiple NFAs.
    +            */
    +           void compileFactory() {
                        // we're traversing the pattern from the end to the 
beginning --> the first state is the final state
    -                   State<T> currentState = new 
State<>(currentPattern.getName(), State.StateType.Final);
    -                   patternNames.add(currentPattern.getName());
    +                   State<T> sinkState = createEndingState();
    +                   // add all the normal states
    +                   sinkState = createMiddleStates(sinkState);
    +                   // add the beginning state
    +                   createStartState(sinkState);
    +           }
    +
    +           List<State<T>> getStates() {
    +                   return states;
    +           }
     
    -                   states.put(currentPattern.getName(), currentState);
    +           long getWindowTime() {
    +                   return windowTime;
    +           }
    +
    +           private State<T> createEndingState() {
    +                   State<T> sinkState = new State<>(ENDING_STATE_NAME, 
State.StateType.Final);
    +                   states.add(sinkState);
    +                   usedNames.add(ENDING_STATE_NAME);
     
                        windowTime = currentPattern.getWindowTime() != null ? 
currentPattern.getWindowTime().toMilliseconds() : 0L;
    +                   return sinkState;
    +           }
     
    -                   while (currentPattern.getPrevious() != null) {
    -                           succeedingPattern = currentPattern;
    -                           succeedingState = currentState;
    -                           currentPattern = currentPattern.getPrevious();
    +           private State<T> createMiddleStates(final State<T> sinkState) {
     
    -                           if 
(!patternNames.add(currentPattern.getName())) {
    -                                   throw new 
MalformedPatternException("Duplicate pattern name: " + currentPattern.getName() 
+ ". " +
    -                                           "Pattern names must be 
unique.");
    +                   State<T> lastSink = sinkState;
    +                   while (currentPattern.getPrevious() != null) {
    +                           State<T> sourceState;
    +
    +                           checkPatternNameUniqueness();
    +
    +                           sourceState = new 
State<>(currentPattern.getName(), State.StateType.Normal);
    +                           states.add(sourceState);
    +                           usedNames.add(sourceState.getName());
    +
    +                           if (currentPattern.getQuantifier().isLooping()) 
{
    +                                   convertToLooping(lastSink, sourceState);
    +
    +                                   if 
(currentPattern.getQuantifier().isAtLeastOne()) {
    +                                           sourceState = 
createFirstMandatoryStateOfLoop(
    +                                                   sourceState,
    +                                                   State.StateType.Normal
    +                                           );
    +                                           states.add(sourceState);
    +                                           
usedNames.add(sourceState.getName());
    +                                   }
    +                           } else if (currentPattern.getQuantifier() == 
Quantifier.TIMES) {
    +                                   sourceState = 
convertToTimesState(lastSink, sourceState, currentPattern.getTimes());
    +                           } else {
    +                                   convertToSingletonState(
    +                                           lastSink,
    +                                           sourceState);
                                }
     
    -                           Time currentWindowTime = 
currentPattern.getWindowTime();
    +                           currentPattern = currentPattern.getPrevious();
    +                           lastSink = sourceState;
     
    +                           final Time currentWindowTime = 
currentPattern.getWindowTime();
                                if (currentWindowTime != null && 
currentWindowTime.toMilliseconds() < windowTime) {
                                        // the window time is the global 
minimum of all window times of each state
                                        windowTime = 
currentWindowTime.toMilliseconds();
                                }
    +                   }
    +
    +                   return lastSink;
    +           }
     
    -                           if 
(states.containsKey(currentPattern.getName())) {
    -                                   currentState = 
states.get(currentPattern.getName());
    +           private void checkPatternNameUniqueness() {
    +                   if (usedNames.contains(currentPattern.getName())) {
    +                           throw new MalformedPatternException(
    +                                   "Duplicate pattern name: " + 
currentPattern.getName() + ". " +
    +                                   "Pattern names must be unique.");
    +                   }
    +           }
    +
    +           @SuppressWarnings("unchecked")
    +           private State<T> createStartState(State<T> sinkState) {
    +                   final State<T> beginningState;
    +
    +                   checkPatternNameUniqueness();
    +
    +                   if (currentPattern.getQuantifier().isLooping()) {
    +                           final State<T> loopingState;
    +                           if 
(currentPattern.getQuantifier().isAtLeastOne()) {
    +                                   loopingState = new 
State<>(currentPattern.getName(), State.StateType.Normal);
    +                                   beginningState = 
createFirstMandatoryStateOfLoop(loopingState, State.StateType.Start);
    +                                   states.add(loopingState);
                                } else {
    -                                   currentState = new 
State<>(currentPattern.getName(), State.StateType.Normal);
    -                                   states.put(currentState.getName(), 
currentState);
    +                                   loopingState = new 
State<>(currentPattern.getName(), State.StateType.Start);
    +                                   beginningState = loopingState;
                                }
    -
    -                           currentState.addStateTransition(new 
StateTransition<T>(
    -                                   StateTransitionAction.TAKE,
    -                                   succeedingState,
    -                                   (FilterFunction<T>) 
succeedingPattern.getFilterFunction()));
    -
    -                           if (succeedingPattern instanceof 
FollowedByPattern) {
    -                                   // the followed by pattern entails a 
reflexive ignore transition
    -                                   currentState.addStateTransition(new 
StateTransition<T>(
    -                                           StateTransitionAction.IGNORE,
    -                                           currentState,
    -                                           null
    -                                   ));
    +                           convertToLooping(sinkState, loopingState);
    +                   } else if (currentPattern.getQuantifier() == 
Quantifier.TIMES) {
    +                           if (currentPattern.getTimes() > 1) {
    +                                   final State<T> timesState = new 
State<>(currentPattern.getName(), State.StateType.Normal);
    +                                   states.add(timesState);
    +                                   sinkState = 
convertToTimesState(sinkState, timesState, currentPattern.getTimes() - 1);
                                }
    +                           beginningState = new 
State<>(currentPattern.getName(), State.StateType.Start);
    +                           beginningState.addTake(sinkState, 
(FilterFunction<T>) currentPattern.getFilterFunction());
    +                   } else {
    +                           beginningState = new 
State<>(currentPattern.getName(), State.StateType.Start);
    +                           beginningState.addTake(sinkState, 
(FilterFunction<T>) currentPattern.getFilterFunction());
                        }
     
    -                   // add the beginning state
    -                   final State<T> beginningState;
    +                   states.add(beginningState);
    +                   usedNames.add(beginningState.getName());
    +
    +                   return beginningState;
    +           }
    +
    +           private State<T> convertToTimesState(State<T> sinkState, 
State<T> sourceState, int times) {
    +                   convertToSingletonState(sinkState, sourceState);
    +                   for (int i = 0; i < times - 1; i++) {
    +                           sinkState = sourceState;
    +                           sourceState = new 
State<>(currentPattern.getName(), State.StateType.Normal);
    +                           states.add(sourceState);
    +                           convertToSingletonState(sinkState, sourceState);
    +                   }
    +                   return sourceState;
    +           }
    +
    +           @SuppressWarnings("unchecked")
    +           private void convertToSingletonState(
    +                   final State<T> sinkState,
    +                   final State<T> sourceState) {
    +
    +                   final FilterFunction<T> currentFilterFunction = 
(FilterFunction<T>) currentPattern.getFilterFunction();
    +                   final FilterFunction<T> trueFunction = 
FilterFunctions.trueFunction();
    +                   sourceState.addTake(sinkState, currentFilterFunction);
    +
    +                   final State<T> ignoreState;
    +                   if (currentPattern.getQuantifier() == 
Quantifier.OPTIONAL) {
    +                           sourceState.addProceed(sinkState, trueFunction);
    +                           ignoreState = new 
State<>(currentPattern.getName(), State.StateType.Normal);
     
    -                   if (states.containsKey(BEGINNING_STATE_NAME)) {
    -                           beginningState = 
states.get(BEGINNING_STATE_NAME);
    +                           ignoreState.addTake(sinkState, 
currentFilterFunction);
    +                           states.add(ignoreState);
                        } else {
    -                           beginningState = new 
State<>(BEGINNING_STATE_NAME, State.StateType.Start);
    -                           states.put(BEGINNING_STATE_NAME, 
beginningState);
    +                           ignoreState = sourceState;
    +                   }
    +
    +                   if (currentPattern instanceof FollowedByPattern) {
    +                           sourceState.addIgnore(ignoreState, 
trueFunction);
    +                   }
    +           }
    +
    +           @SuppressWarnings("unchecked")
    +           private State<T> createFirstMandatoryStateOfLoop(
    +                   final State<T> sinkState,
    +                   final State.StateType stateType) {
    +
    --- End diff --
    
    I agree that it is defensive. The only reason is that:
    1) why not being defensive on these things? :P
    2) I believe the code is more readable if the assumptions are clear when 
you start reading a method (although also the name can play the same role). If 
you see this check, then you know what is happening. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to