Github user zhangminglei commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6205#discussion_r197627410
  
    --- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
    @@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
         * @throws Exception Thrown if the system cannot access the state.
         */
        public void releaseEvent(EventId eventId) throws Exception {
    -           Lockable<V> eventWrapper = eventsBuffer.get(eventId);
    +           Lockable<V> eventWrapper = getEvent(eventId);
                if (eventWrapper != null) {
                        if (eventWrapper.release()) {
                                eventsBuffer.remove(eventId);
    +                           eventsBufferCache.remove(eventId);
                        } else {
    -                           eventsBuffer.put(eventId, eventWrapper);
    +                           cacheEvent(eventId, eventWrapper);
                        }
                }
        }
     
    +   // Cache related method
    +
    +   /////////////////////////////////////////////
    +   //  Put
    +   /////////////////////////////////////////////
    +
    +   /**
    +    * Put a event to cache.
    +    * @param eventId id of the event
    --- End diff --
    
    Put an event.


---

Reply via email to