Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6205#discussion_r198146934
  
    --- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
    @@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
         * @throws Exception Thrown if the system cannot access the state.
         */
        public void releaseEvent(EventId eventId) throws Exception {
    -           Lockable<V> eventWrapper = eventsBuffer.get(eventId);
    +           Lockable<V> eventWrapper = getEvent(eventId);
                if (eventWrapper != null) {
                        if (eventWrapper.release()) {
                                eventsBuffer.remove(eventId);
    +                           eventsBufferCache.remove(eventId);
                        } else {
    -                           eventsBuffer.put(eventId, eventWrapper);
    +                           cacheEvent(eventId, eventWrapper);
                        }
                }
        }
     
    +   // Cache related method
    +
    +   /////////////////////////////////////////////
    +   //  Put
    +   /////////////////////////////////////////////
    +
    +   /**
    +    * Put an event to cache.
    +    * @param eventId id of the event
    +    * @param event event body
    +    */
    +   private void cacheEvent(EventId eventId, Lockable<V> event) {
    +           this.eventsBufferCache.put(eventId, event);
    +   }
    +
    +   /**
    +    * Put a ShareBufferNode to cache.
    +    * @param nodeId id of the event
    +    * @param entry SharedBufferNode
    +    */
    +   private void cacheEntry(NodeId nodeId, Lockable<SharedBufferNode> 
entry) {
    +           this.entryCache.put(nodeId, entry);
    +   }
    +
    +   /////////////////////////////////////////////
    +   // Get
    +   /////////////////////////////////////////////
    +
    +   /**
    +    * Try to get the sharedBufferNode from state iff the node has not been 
quered during this turn process.
    +    * @param nodeId id of the event
    +    * @return SharedBufferNode
    +    * @throws Exception Thrown if the system cannot access the state.
    +    */
    +   private Lockable<SharedBufferNode> getEntry(NodeId nodeId) throws 
Exception {
    +           Lockable<SharedBufferNode> entry = entryCache.get(nodeId);
    +           return  entry != null ? entry : entries.get(nodeId);
    +   }
    +
    +   private Lockable<V> getEvent(EventId eventId) throws Exception {
    +           Lockable<V> event = eventsBufferCache.get(eventId);
    +           return event != null ? event : eventsBuffer.get(eventId);
    +   }
    +
    +   /**
    +    * Flush the event and node in map to state.
    +    * @throws Exception Thrown if the system cannot access the state.
    +    */
    +   public void flushCache() throws Exception {
    +           entryCache.forEach((k, v) -> {
    +                           try {
    +                                   entries.put(k, v);
    +                           } catch (Exception e) {
    +                                   throw new RuntimeException();
    +                           }
    +                   }
    +           );
    +
    +           eventsBufferCache.forEach((k, v) -> {
    +                           try {
    +                                   eventsBuffer.put(k, v);
    +                           } catch (Exception e) {
    +                                   throw new RuntimeException();
    --- End diff --
    
    In fact, what I means is that if you want to throw an exception here, you 
could throw the exception as `throw new RuntimeException("exception message", 
originalException)`, this way the original exception won't be swallowed.


---

Reply via email to