Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4143#discussion_r123713960
  
    --- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
    @@ -514,25 +524,46 @@ private void addStopStateToLooping(final State<T> 
loopingState) {
                 */
                @SuppressWarnings("unchecked")
                private State<T> createInitOptionalStateOfZeroOrMore(final 
State<T> loopingState, final State<T> lastSink) {
    -                   final IterativeCondition<T> currentCondition = 
(IterativeCondition<T>) currentPattern.getCondition();
    +                   final IterativeCondition<T> takeCondition = 
extendWithUntilCondition(
    +                           (IterativeCondition<T>) 
currentPattern.getCondition(),
    +                           (IterativeCondition<T>) 
currentPattern.getUntilCondition()
    +                   );
     
                        final State<T> firstState = 
createState(currentPattern.getName(), State.StateType.Normal);
                        firstState.addProceed(lastSink, 
BooleanConditions.<T>trueFunction());
    -                   firstState.addTake(loopingState, currentCondition);
    +                   firstState.addTake(loopingState, takeCondition);
     
                        final IterativeCondition<T> ignoreFunction = 
getIgnoreCondition(currentPattern);
                        if (ignoreFunction != null) {
                                final State<T> firstStateWithoutProceed = 
createState(currentPattern.getName(), State.StateType.Normal);
                                firstState.addIgnore(firstStateWithoutProceed, 
ignoreFunction);
                                
firstStateWithoutProceed.addIgnore(ignoreFunction);
    -                           firstStateWithoutProceed.addTake(loopingState, 
currentCondition);
    +                           firstStateWithoutProceed.addTake(loopingState, 
takeCondition);
     
                                addStopStates(firstStateWithoutProceed);
                        }
                        return firstState;
                }
     
                /**
    +            * This method extends the given condition with stop(until) 
condition if necessary.
    +            * The until condition needs to be applied only if both of the 
given conditions are not null.
    +            *
    +            * @param condition the condition to extend
    +            * @param untilCondition the until condition to join with the 
given condition
    +            * @return condition with AND applied or the original condition
    +            */
    +           private IterativeCondition<T> extendWithUntilCondition(
    +                           IterativeCondition<T> condition,
    +                           IterativeCondition<T> untilCondition) {
    +                   if (untilCondition != null && condition != null) {
    +                           return new AndCondition<>(new 
NotCondition<>(untilCondition), condition);
    +                   } else {
    +                           return condition;
    +                   }
    +           }
    +
    --- End diff --
    
    The way this is, now you do not allow patterns with no condition and only 
until condition, like: 
    
    ```
    Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
    
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
                }).followedBy("middle").oneOrMore().until(UNTIL_CONDITION);
    ```
    
    To do this, I think that transforming the method to the following will do 
the job:
    
    ```
    if (untilCondition != null && condition != null) {
                                return new AndCondition<>(new 
NotCondition<>(untilCondition), condition);
                        }
                        if (condition != null) {
                                return condition;
                        }
                        if (untilCondition != null) {
                                return new NotCondition<>(untilCondition);
                        }
                        return null;
    ```
    
    And to do this properly, you could also add a test in the 
`UntilConditionITCase` like:
    
    ```
    List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    
                Event startEvent = new Event(40, "c", 1.0);
                Event middleEvent1 = new Event(41, "a", 2.0);
                Event middleEvent2 = new Event(42, "a", 3.0);
                Event startEvent2 = new Event(40, "d", 1.0);
                Event breaking = new Event(44, "a", 5.0);
                Event ignored = new Event(45, "a", 6.0);
    
                inputEvents.add(new StreamRecord<>(startEvent, 1));
                inputEvents.add(new StreamRecord<>(middleEvent1, 3));
                inputEvents.add(new StreamRecord<>(middleEvent2, 4));
                inputEvents.add(new StreamRecord<>(startEvent2, 4));
                inputEvents.add(new StreamRecord<>(breaking, 6));
                inputEvents.add(new StreamRecord<>(ignored, 7));
    
                Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
    
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
                }).followedBy("middle").oneOrMore().until(UNTIL_CONDITION);
    
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
    
                final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
    ```
    
    And you could also add one for the case that this method returns `null`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to