[
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523713#comment-16523713
]
ASF GitHub Bot commented on FLINK-9642:
---------------------------------------
Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/6205#discussion_r198135717
--- Diff:
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws
Exception {
* @throws Exception Thrown if the system cannot access the state.
*/
public void releaseEvent(EventId eventId) throws Exception {
- Lockable<V> eventWrapper = eventsBuffer.get(eventId);
+ Lockable<V> eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+ eventsBufferCache.remove(eventId);
} else {
- eventsBuffer.put(eventId, eventWrapper);
+ cacheEvent(eventId, eventWrapper);
}
}
}
+ // Cache related method
+
+ /////////////////////////////////////////////
+ // Put
+ /////////////////////////////////////////////
+
+ /**
+ * Put an event to cache.
+ * @param eventId id of the event
+ * @param event event body
+ */
+ private void cacheEvent(EventId eventId, Lockable<V> event) {
+ this.eventsBufferCache.put(eventId, event);
+ }
+
+ /**
+ * Put a ShareBufferNode to cache.
+ * @param nodeId id of the event
+ * @param entry SharedBufferNode
+ */
+ private void cacheEntry(NodeId nodeId, Lockable<SharedBufferNode>
entry) {
+ this.entryCache.put(nodeId, entry);
+ }
+
+ /////////////////////////////////////////////
+ // Get
+ /////////////////////////////////////////////
+
+ /**
+ * Try to get the sharedBufferNode from state iff the node has not been
quered during this turn process.
+ * @param nodeId id of the event
+ * @return SharedBufferNode
+ * @throws Exception Thrown if the system cannot access the state.
+ */
+ private Lockable<SharedBufferNode> getEntry(NodeId nodeId) throws
Exception {
+ Lockable<SharedBufferNode> entry = entryCache.get(nodeId);
+ return entry != null ? entry : entries.get(nodeId);
+ }
+
+ private Lockable<V> getEvent(EventId eventId) throws Exception {
+ Lockable<V> event = eventsBufferCache.get(eventId);
+ return event != null ? event : eventsBuffer.get(eventId);
+ }
+
+ /**
+ * Flush the event and node in map to state.
+ * @throws Exception Thrown if the system cannot access the state.
+ */
+ public void flushCache() throws Exception {
+ entryCache.forEach((k, v) -> {
--- End diff --
I would suggest to use the `entries.putAll()` instead, since it could get a
better performance when your are using RocksDB backend.
> Reduce the count to deal with state during a CEP process
> ---------------------------------------------------------
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
> Issue Type: Improvement
> Components: CEP
> Affects Versions: 1.6.0
> Reporter: aitozi
> Assignee: aitozi
> Priority: Major
> Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is
> deal with rocksdb state which is different from the previous version which
> will read the state of sharedBuffer all to memory, i think we can add a cache
> or variable in sharedbuffer to cache the LockAble Object to mark the ref
> change in once process in NFA, this will reduce the count when the events
> point to the same NodeId.. And flush the result to MapState at the end of
> process.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)