Github user Aitozi commented on a diff in the pull request:
https://github.com/apache/flink/pull/6205#discussion_r197629310
--- Diff:
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws
Exception {
* @throws Exception Thrown if the system cannot access the state.
*/
public void releaseEvent(EventId eventId) throws Exception {
- Lockable<V> eventWrapper = eventsBuffer.get(eventId);
+ Lockable<V> eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+ eventsBufferCache.remove(eventId);
} else {
- eventsBuffer.put(eventId, eventWrapper);
+ cacheEvent(eventId, eventWrapper);
}
}
}
+ // Cache related method
+
+ /////////////////////////////////////////////
+ // Put
+ /////////////////////////////////////////////
+
+ /**
+ * Put a event to cache.
+ * @param eventId id of the event
--- End diff --
fixed
---