Github user smarthi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2367#discussion_r74711517
  
    --- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
 ---
    @@ -256,16 +267,75 @@ public void prune(long pruningTimestamp) {
                                                                                
edge.getTarget(),
                                                                                
edge.getVersion(),
                                                                                
copy));
    +                                                           if 
(matchingBehaviour == MatchingBehaviour.AFTER_LAST) {
    +                                                                   
cleanUp.add(edge.getTarget());
    +                                                           }
                                                        }
                                                }
                                        }
                                }
                        }
                }
     
    +           // Remove shared buffer entries to maintain correct matching 
behaviour
    +           doCleanUp(new Predicate<K, V>() {
    +
    +                   @Override
    +                   public boolean toRemove(SharedBufferEntry<K, V> entry) {
    +                           return cleanUp.contains(entry);
    +                   }
    +           });
    +           // Remove all entries that are dependent on the current event
    +           if (matchingBehaviour == MatchingBehaviour.AFTER_LAST) {
    +                   doCleanUp(new Predicate<K, V>() {
    +
    +                           @Override
    +                           public boolean toRemove(SharedBufferEntry<K, V> 
entry) {
    +                                   if (entry == null) {
    +                                           return false;
    +                                   }
    +                                   return entry.getValueTime().value == 
value
    +                                           && 
entry.getValueTime().timestamp == timestamp;
    +                           }
    +                   });
    +           }
    +
                return result;
        }
     
    +   private void doCleanUp(Predicate<K, V> predicate) {
    +           ArrayList<SharedBufferEntry<K, V>> toRemove = new ArrayList<>();
    +           for (SharedBufferPage<K, V> page : this.pages.values()) {
    +                   for (SharedBufferEntry<K, V> entry : page.getEntries()) 
{
    +                           if (entry.getReferenceCounter() <= 1) {
    +                                   doRecursiveCleanup(entry, predicate, 
toRemove);
    +                           }
    +                   }
    +           }
    +
    +           for (SharedBufferEntry<K, V> startNode: toRemove) {
    +                   release(startNode.page.getKey(), 
startNode.getValueTime().value, startNode.getValueTime().getTimestamp());
    +                   remove(startNode.page.getKey(), 
startNode.getValueTime().value, startNode.getValueTime().getTimestamp());
    +           }
    +   }
    +
    +   private boolean doRecursiveCleanup(SharedBufferEntry<K, V> startNode, 
Predicate<K, V> cleanUp, ArrayList<SharedBufferEntry<K, V>> toRemove) {
    --- End diff --
    
    Replace ArrayList by List in the arguments, unless we need it to be 
ArrayList explicitly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to