Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6059#discussion_r194468194
  
    --- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -296,42 +292,31 @@ public void resetNFAChanged() {
                        if (shouldDiscardPath) {
                                // a stop state was reached in this branch. 
release branch which results in removing previous event from
                                // the buffer
    -                           for (final ComputationState<T> state : 
statesToRetain) {
    -                                   eventSharedBuffer.release(
    -                                                   
NFAStateNameHandler.getOriginalNameFromInternal(
    -                                                                   
state.getPreviousState().getName()),
    -                                                   state.getEvent(),
    -                                                   state.getTimestamp(),
    -                                                   state.getCounter());
    +                           for (final ComputationState state : 
statesToRetain) {
    +                                   
sharedBuffer.releaseNode(state.getPreviousBufferEntry());
                                }
                        } else {
                                computationStates.addAll(statesToRetain);
                        }
     
                }
     
    -           discardComputationStatesAccordingToStrategy(computationStates, 
result, afterMatchSkipStrategy);
    -
    -           // prune shared buffer based on window length
    -           if (windowTime > 0L) {
    -                   long pruningTimestamp = timestamp - windowTime;
    -
    -                   if (pruningTimestamp < timestamp) {
    -                           // the check is to guard against underflows
    +           discardComputationStatesAccordingToStrategy(
    +                   sharedBuffer, computationStates, result, 
afterMatchSkipStrategy);
     
    -                           // remove all elements which are expired
    -                           // with respect to the window length
    -                           if (eventSharedBuffer.prune(pruningTimestamp)) {
    -                                   nfaChanged = true;
    -                           }
    -                   }
    +           if (event.getEvent() == null) {
    --- End diff --
    
    Why here `==null`? I may be missing sth.


---

Reply via email to