Repository: nifi
Updated Branches:
  refs/heads/master cca520aab -> d75ba167c


NIFI-5225: Purge event data from event repository when Connectable is removed

This closes #2732.

Signed-off-by: Mark Payne <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d75ba167
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d75ba167
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d75ba167

Branch: refs/heads/master
Commit: d75ba167cd93042c3f747f4aacb507617694bc0c
Parents: cca520a
Author: Frederik Petersen <[email protected]>
Authored: Tue May 22 12:55:59 2018 +0200
Committer: Mark Payne <[email protected]>
Committed: Wed May 23 13:39:20 2018 -0400

----------------------------------------------------------------------
 .../repository/FlowFileEventRepository.java     |  8 +++++
 .../apache/nifi/controller/FlowController.java  | 20 ++++++++---
 .../metrics/RingBufferEventRepository.java      |  5 +++
 .../TestRingBufferEventRepository.java          | 35 ++++++++++++++++++--
 4 files changed, 60 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d75ba167/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java
index 560dc05..1781d18 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java
@@ -42,4 +42,12 @@ public interface FlowFileEventRepository extends Closeable {
      * @param cutoffEpochMilliseconds cutoff
      */
     void purgeTransferEvents(long cutoffEpochMilliseconds);
+
+    /**
+     * Causes any flow file events of the given component to be purged from the
+     * repository
+     *
+     * @param componentIdentifier Identifier of the component
+     */
+    void purgeTransferEvents(String componentIdentifier);
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d75ba167/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 98b33ec..db18ada 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -2651,7 +2651,9 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     }
 
     public void onProcessorRemoved(final ProcessorNode procNode) {
-        allProcessors.remove(procNode.getIdentifier());
+        String identifier = procNode.getIdentifier();
+        flowFileEventRepository.purgeTransferEvents(identifier);
+        allProcessors.remove(identifier);
     }
 
     public ProcessorNode getProcessorNode(final String id) {
@@ -2663,7 +2665,9 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     }
 
     public void onConnectionRemoved(final Connection connection) {
-        allConnections.remove(connection.getIdentifier());
+        String identifier = connection.getIdentifier();
+        flowFileEventRepository.purgeTransferEvents(identifier);
+        allConnections.remove(identifier);
     }
 
     public Connection getConnection(final String id) {
@@ -2675,7 +2679,9 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     }
 
     public void onInputPortRemoved(final Port inputPort) {
-        allInputPorts.remove(inputPort.getIdentifier());
+        String identifier = inputPort.getIdentifier();
+        flowFileEventRepository.purgeTransferEvents(identifier);
+        allInputPorts.remove(identifier);
     }
 
     public Port getInputPort(final String id) {
@@ -2687,7 +2693,9 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     }
 
     public void onOutputPortRemoved(final Port outputPort) {
-        allOutputPorts.remove(outputPort.getIdentifier());
+        String identifier = outputPort.getIdentifier();
+        flowFileEventRepository.purgeTransferEvents(identifier);
+        allOutputPorts.remove(identifier);
     }
 
     public Port getOutputPort(final String id) {
@@ -2699,7 +2707,9 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     }
 
     public void onFunnelRemoved(final Funnel funnel) {
-        allFunnels.remove(funnel.getIdentifier());
+        String identifier = funnel.getIdentifier();
+        flowFileEventRepository.purgeTransferEvents(identifier);
+        allFunnels.remove(identifier);
     }
 
     public Funnel getFunnel(final String id) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/d75ba167/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java
index b9a82ed..c60f98d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java
@@ -64,4 +64,9 @@ public class RingBufferEventRepository implements 
FlowFileEventRepository {
         }
     }
 
+    @Override
+    public void purgeTransferEvents(String componentIdentifier) {
+        componentEventMap.remove(componentIdentifier);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d75ba167/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRingBufferEventRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRingBufferEventRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRingBufferEventRepository.java
index cb5c306..2bc158f 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRingBufferEventRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRingBufferEventRepository.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.junit.Test;
+import org.testng.Assert;
 
 public class TestRingBufferEventRepository {
 
@@ -33,7 +34,7 @@ public class TestRingBufferEventRepository {
         final RingBufferEventRepository repo = new 
RingBufferEventRepository(5);
         long insertNanos = 0L;
         for (int i = 0; i < 1000000; i++) {
-            final FlowFileEvent event = generateEvent();
+            final FlowFileEvent event = generateEvent("ABC");
 
             final long insertStart = System.nanoTime();
             repo.updateRepository(event);
@@ -49,11 +50,39 @@ public class TestRingBufferEventRepository {
         repo.close();
     }
 
-    private FlowFileEvent generateEvent() {
+    @Test
+    public void testPurge() throws IOException {
+        final FlowFileEventRepository repo = new RingBufferEventRepository(5);
+        String id1 = "component1";
+        String id2 = "component2";
+        repo.updateRepository(generateEvent(id1));
+        repo.updateRepository(generateEvent(id2));
+        RepositoryStatusReport report = 
repo.reportTransferEvents(System.currentTimeMillis() - 2 * 60000);
+        FlowFileEvent entry = report.getReportEntry(id1);
+        Assert.assertNotNull(entry);
+        entry = report.getReportEntry(id2);
+        Assert.assertNotNull(entry);
+
+        repo.purgeTransferEvents(id1);
+        report = repo.reportTransferEvents(System.currentTimeMillis() - 2 * 
60000);
+        entry = report.getReportEntry(id1);
+        Assert.assertNull(entry);
+        entry = report.getReportEntry(id2);
+        Assert.assertNotNull(entry);
+
+        repo.purgeTransferEvents(id2);
+        report = repo.reportTransferEvents(System.currentTimeMillis() - 2 * 
60000);
+        entry = report.getReportEntry(id2);
+        Assert.assertNull(entry);
+
+        repo.close();
+    }
+
+    private FlowFileEvent generateEvent(final String id) {
         return new FlowFileEvent() {
             @Override
             public String getComponentIdentifier() {
-                return "ABC";
+                return id;
             }
 
             @Override

Reply via email to