Repository: nifi
Updated Branches:
  refs/heads/NIFI-108-2 3ad5b3ea0 -> 1e74357ca


NIFI-108: Added heartbeat when we finish clearing queue


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1e74357c
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1e74357c
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1e74357c

Branch: refs/heads/NIFI-108-2
Commit: 1e74357caf5405701d8bf24e73e1f450c18e87a0
Parents: 3ad5b3e
Author: Mark Payne <[email protected]>
Authored: Fri Jan 15 14:16:48 2016 -0500
Committer: Mark Payne <[email protected]>
Committed: Fri Jan 15 14:16:48 2016 -0500

----------------------------------------------------------------------
 .../org/apache/nifi/connectable/StandardConnection.java     | 9 ++++++++-
 .../java/org/apache/nifi/controller/FlowController.java     | 1 +
 .../org/apache/nifi/controller/StandardFlowFileQueue.java   | 8 +++++++-
 .../apache/nifi/controller/TestStandardFlowFileQueue.java   | 2 +-
 .../controller/repository/TestStandardProcessSession.java   | 2 +-
 5 files changed, 18 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/1e74357c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
index d43a3db..1ef18c0 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.nifi.controller.Heartbeater;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.StandardFlowFileQueue;
 import org.apache.nifi.controller.queue.FlowFileQueue;
@@ -70,7 +71,7 @@ public final class StandardConnection implements Connection {
         relationships = new 
AtomicReference<>(Collections.unmodifiableCollection(builder.relationships));
         scheduler = builder.scheduler;
         flowFileQueue = new StandardFlowFileQueue(id, this, 
builder.flowFileRepository, builder.provenanceRepository, 
builder.resourceClaimManager,
-            scheduler, builder.swapManager, builder.eventReporter, 
NiFiProperties.getInstance().getQueueSwapThreshold());
+            scheduler, builder.swapManager, builder.eventReporter, 
NiFiProperties.getInstance().getQueueSwapThreshold(), builder.heartbeater);
         hashCode = new HashCodeBuilder(7, 67).append(id).toHashCode();
     }
 
@@ -269,6 +270,7 @@ public final class StandardConnection implements Connection 
{
         private FlowFileRepository flowFileRepository;
         private ProvenanceEventRepository provenanceRepository;
         private ResourceClaimManager resourceClaimManager;
+        private Heartbeater heartbeater;
 
         public Builder(final ProcessScheduler scheduler) {
             this.scheduler = scheduler;
@@ -304,6 +306,11 @@ public final class StandardConnection implements 
Connection {
             return this;
         }
 
+        public Builder heartbeater(final Heartbeater heartbeater) {
+            this.heartbeater = heartbeater;
+            return this;
+        }
+
         public Builder bendPoints(final List<Position> bendPoints) {
             this.bendPoints.clear();
             this.bendPoints.addAll(bendPoints);

http://git-wip-us.apache.org/repos/asf/nifi/blob/1e74357c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index dd3b687..0cab9ad 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -770,6 +770,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             .resourceClaimManager(resourceClaimManager)
             .flowFileRepository(flowFileRepository)
             .provenanceRepository(provenanceEventRepository)
+            .heartbeater(this)
             .build();
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/1e74357c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index 6d3f0c9..cda00c8 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -107,6 +107,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
     private final FlowFileRepository flowFileRepository;
     private final ProvenanceEventRepository provRepository;
     private final ResourceClaimManager resourceClaimManager;
+    private final Heartbeater heartbeater;
 
     private final ConcurrentMap<String, DropFlowFileRequest> dropRequestMap = 
new ConcurrentHashMap<>();
     private final ConcurrentMap<String, ListFlowFileRequest> listRequestMap = 
new ConcurrentHashMap<>();
@@ -115,7 +116,8 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
     private final ProcessScheduler scheduler;
 
     public StandardFlowFileQueue(final String identifier, final Connection 
connection, final FlowFileRepository flowFileRepo, final 
ProvenanceEventRepository provRepo,
-        final ResourceClaimManager resourceClaimManager, final 
ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final 
EventReporter eventReporter, final int swapThreshold) {
+        final ResourceClaimManager resourceClaimManager, final 
ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final 
EventReporter eventReporter, final int swapThreshold,
+        final Heartbeater heartbeater) {
         activeQueue = new PriorityQueue<>(20, new Prioritizer(new 
ArrayList<FlowFilePrioritizer>()));
         priorities = new ArrayList<>();
         swapQueue = new ArrayList<>();
@@ -129,6 +131,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
         this.swapThreshold = swapThreshold;
         this.scheduler = scheduler;
         this.connection = connection;
+        this.heartbeater = heartbeater;
 
         readLock = new TimedLock(this.lock.readLock(), identifier + " Read 
Lock", 100);
         writeLock = new TimedLock(this.lock.writeLock(), identifier + " Write 
Lock", 100);
@@ -1137,6 +1140,9 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
                         logger.info("Successfully dropped {} FlowFiles ({} 
bytes) from Connection with ID {} on behalf of {}",
                             dropRequest.getDroppedSize().getObjectCount(), 
dropRequest.getDroppedSize().getByteCount(), 
StandardFlowFileQueue.this.getIdentifier(), requestor);
                         dropRequest.setState(DropFlowFileState.COMPLETE);
+                        if (heartbeater != null) {
+                            heartbeater.heartbeat();
+                        }
                     } catch (final Exception e) {
                         logger.error("Failed to drop FlowFiles from Connection 
with ID {} due to {}", StandardFlowFileQueue.this.getIdentifier(), 
e.toString());
                         logger.error("", e);

http://git-wip-us.apache.org/repos/asf/nifi/blob/1e74357c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
index 8cfe146..32d8566 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
@@ -102,7 +102,7 @@ public class TestStandardFlowFileQueue {
             }
         }).when(provRepo).registerEvents(Mockito.any(Iterable.class));
 
-        queue = new StandardFlowFileQueue("id", connection, flowFileRepo, 
provRepo, claimManager, scheduler, swapManager, null, 10000);
+        queue = new StandardFlowFileQueue("id", connection, flowFileRepo, 
provRepo, claimManager, scheduler, swapManager, null, 10000, null);
         TestFlowFile.idGenerator.set(0L);
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/1e74357c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 644018f..f8db35e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -139,7 +139,7 @@ public class TestStandardProcessSession {
         final ProcessScheduler processScheduler = 
Mockito.mock(ProcessScheduler.class);
 
         final FlowFileSwapManager swapManager = 
Mockito.mock(FlowFileSwapManager.class);
-        flowFileQueue = new StandardFlowFileQueue("1", connection, 
flowFileRepo, provenanceRepo, null, processScheduler, swapManager, null, 10000);
+        flowFileQueue = new StandardFlowFileQueue("1", connection, 
flowFileRepo, provenanceRepo, null, processScheduler, swapManager, null, 10000, 
null);
         when(connection.getFlowFileQueue()).thenReturn(flowFileQueue);
 
         Mockito.doAnswer(new Answer<Object>() {

Reply via email to