This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new f130e4b [FLINK-12319][Library/CEP]Change the logic of releasing node from recursive to non-recursive f130e4b is described below commit f130e4bc259746b1542a2f4d8907d3b35195feb8 Author: liyafan82 <42827532+liyafa...@users.noreply.github.com> AuthorDate: Fri Jul 5 14:44:45 2019 +0800 [FLINK-12319][Library/CEP]Change the logic of releasing node from recursive to non-recursive --- .../cep/nfa/sharedbuffer/SharedBufferAccessor.java | 45 +++++++++++----------- .../cep/nfa/sharedbuffer/SharedBufferTest.java | 38 ++++++++++++++++++ 2 files changed, 61 insertions(+), 22 deletions(-) diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java index 2613f9d..c92df1b 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java @@ -235,31 +235,32 @@ public class SharedBufferAccessor<V> implements AutoCloseable { * @throws Exception Thrown if the system cannot access the state. */ public void releaseNode(final NodeId node) throws Exception { - Lockable<SharedBufferNode> sharedBufferNode = sharedBuffer.getEntry(node); - if (sharedBufferNode != null) { - if (sharedBufferNode.release()) { - removeNode(node, sharedBufferNode.getElement()); - } else { - sharedBuffer.upsertEntry(node, sharedBufferNode); + // the stack used to detect all nodes that needs to be released. + Stack<NodeId> nodesToExamine = new Stack<>(); + nodesToExamine.push(node); + + while (!nodesToExamine.isEmpty()) { + NodeId curNode = nodesToExamine.pop(); + Lockable<SharedBufferNode> curBufferNode = sharedBuffer.getEntry(curNode); + + if (curBufferNode == null) { + break; } - } - } - /** - * Removes the {@code SharedBufferNode}, when the ref is decreased to zero, and also - * decrease the ref of the edge on this node. - * - * @param node id of the entry - * @param sharedBufferNode the node body to be removed - * @throws Exception Thrown if the system cannot access the state. - */ - private void removeNode(NodeId node, SharedBufferNode sharedBufferNode) throws Exception { - sharedBuffer.removeEntry(node); - EventId eventId = node.getEventId(); - releaseEvent(eventId); + if (curBufferNode.release()) { + // first release the current node + sharedBuffer.removeEntry(curNode); + releaseEvent(curNode.getEventId()); - for (SharedBufferEdge sharedBufferEdge : sharedBufferNode.getEdges()) { - releaseNode(sharedBufferEdge.getTarget()); + for (SharedBufferEdge sharedBufferEdge : curBufferNode.getElement().getEdges()) { + NodeId targetId = sharedBufferEdge.getTarget(); + if (targetId != null) { + nodesToExamine.push(targetId); + } + } + } else { + sharedBuffer.upsertEntry(curNode, curBufferNode); + } } } diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java index 0583e8b..abd5b8b 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java @@ -280,4 +280,42 @@ public class SharedBufferTest extends TestLogger { assertEquals(4, sharedBuffer.getSharedBufferNodeSize()); } + /** + * Test releasing a node which has a long path to the terminal node (the node without an out-going edge). + * @throws Exception if creating the shared buffer accessor fails. + */ + @Test + public void testReleaseNodesWithLongPath() throws Exception { + SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer()); + + final int numberEvents = 100000; + Event[] events = new Event[numberEvents]; + EventId[] eventIds = new EventId[numberEvents]; + NodeId[] nodeIds = new NodeId[numberEvents]; + + final long timestamp = 1L; + + for (int i = 0; i < numberEvents; i++) { + events[i] = new Event(i + 1, "e" + (i + 1), i); + eventIds[i] = sharedBuffer.registerEvent(events[i], timestamp); + } + + try (SharedBufferAccessor<Event> sharedBufferAccessor = sharedBuffer.getAccessor()) { + + for (int i = 0; i < numberEvents; i++) { + NodeId prevId = i == 0 ? null : nodeIds[i - 1]; + nodeIds[i] = sharedBufferAccessor.put("n" + i, eventIds[i], prevId, DeweyNumber.fromString("1.0")); + } + + NodeId lastNode = nodeIds[numberEvents - 1]; + sharedBufferAccessor.releaseNode(lastNode); + + for (int i = 0; i < numberEvents; i++) { + sharedBufferAccessor.releaseEvent(eventIds[i]); + } + } + + assertTrue(sharedBuffer.isEmpty()); + } + }