Github user dianfu commented on a diff in the pull request:
https://github.com/apache/flink/pull/5141#discussion_r159149194
--- Diff:
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
---
@@ -191,22 +191,28 @@ public boolean isEmpty() {
*/
public boolean prune(long pruningTimestamp) {
Iterator<Map.Entry<K, SharedBufferPage<K, V>>> iter =
pages.entrySet().iterator();
- boolean pruned = false;
+ List<SharedBufferEntry<K, V>> prunedEntries = new ArrayList<>();
while (iter.hasNext()) {
SharedBufferPage<K, V> page = iter.next().getValue();
- if (page.prune(pruningTimestamp)) {
- pruned = true;
- }
+ page.prune(pruningTimestamp, prunedEntries);
if (page.isEmpty()) {
// delete page if it is empty
iter.remove();
}
}
- return pruned;
+ if (!prunedEntries.isEmpty()) {
+ for (Map.Entry<K, SharedBufferPage<K, V>> entry :
pages.entrySet()) {
+ entry.getValue().removeEdges(prunedEntries);
+ }
+ prunedEntries.clear();
--- End diff --
Updated.
---