Repository: nifi Updated Branches: refs/heads/NIFI-108-2 3ad5b3ea0 -> 1e74357ca
NIFI-108: Added heartbeat when we finish clearing queue Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1e74357c Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1e74357c Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1e74357c Branch: refs/heads/NIFI-108-2 Commit: 1e74357caf5405701d8bf24e73e1f450c18e87a0 Parents: 3ad5b3e Author: Mark Payne <[email protected]> Authored: Fri Jan 15 14:16:48 2016 -0500 Committer: Mark Payne <[email protected]> Committed: Fri Jan 15 14:16:48 2016 -0500 ---------------------------------------------------------------------- .../org/apache/nifi/connectable/StandardConnection.java | 9 ++++++++- .../java/org/apache/nifi/controller/FlowController.java | 1 + .../org/apache/nifi/controller/StandardFlowFileQueue.java | 8 +++++++- .../apache/nifi/controller/TestStandardFlowFileQueue.java | 2 +- .../controller/repository/TestStandardProcessSession.java | 2 +- 5 files changed, 18 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/1e74357c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java index d43a3db..1ef18c0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.nifi.controller.Heartbeater; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.StandardFlowFileQueue; import org.apache.nifi.controller.queue.FlowFileQueue; @@ -70,7 +71,7 @@ public final class StandardConnection implements Connection { relationships = new AtomicReference<>(Collections.unmodifiableCollection(builder.relationships)); scheduler = builder.scheduler; flowFileQueue = new StandardFlowFileQueue(id, this, builder.flowFileRepository, builder.provenanceRepository, builder.resourceClaimManager, - scheduler, builder.swapManager, builder.eventReporter, NiFiProperties.getInstance().getQueueSwapThreshold()); + scheduler, builder.swapManager, builder.eventReporter, NiFiProperties.getInstance().getQueueSwapThreshold(), builder.heartbeater); hashCode = new HashCodeBuilder(7, 67).append(id).toHashCode(); } @@ -269,6 +270,7 @@ public final class StandardConnection implements Connection { private FlowFileRepository flowFileRepository; private ProvenanceEventRepository provenanceRepository; private ResourceClaimManager resourceClaimManager; + private Heartbeater heartbeater; public Builder(final ProcessScheduler scheduler) { this.scheduler = scheduler; @@ -304,6 +306,11 @@ public final class StandardConnection implements Connection { return this; } + public Builder heartbeater(final Heartbeater heartbeater) { + this.heartbeater = heartbeater; + return this; + } + public Builder bendPoints(final List<Position> bendPoints) { this.bendPoints.clear(); this.bendPoints.addAll(bendPoints); http://git-wip-us.apache.org/repos/asf/nifi/blob/1e74357c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index dd3b687..0cab9ad 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -770,6 +770,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R .resourceClaimManager(resourceClaimManager) .flowFileRepository(flowFileRepository) .provenanceRepository(provenanceEventRepository) + .heartbeater(this) .build(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/1e74357c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index 6d3f0c9..cda00c8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -107,6 +107,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { private final FlowFileRepository flowFileRepository; private final ProvenanceEventRepository provRepository; private final ResourceClaimManager resourceClaimManager; + private final Heartbeater heartbeater; private final ConcurrentMap<String, DropFlowFileRequest> dropRequestMap = new ConcurrentHashMap<>(); private final ConcurrentMap<String, ListFlowFileRequest> listRequestMap = new ConcurrentHashMap<>(); @@ -115,7 +116,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue { private final ProcessScheduler scheduler; public StandardFlowFileQueue(final String identifier, final Connection connection, final FlowFileRepository flowFileRepo, final ProvenanceEventRepository provRepo, - final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter, final int swapThreshold) { + final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter, final int swapThreshold, + final Heartbeater heartbeater) { activeQueue = new PriorityQueue<>(20, new Prioritizer(new ArrayList<FlowFilePrioritizer>())); priorities = new ArrayList<>(); swapQueue = new ArrayList<>(); @@ -129,6 +131,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { this.swapThreshold = swapThreshold; this.scheduler = scheduler; this.connection = connection; + this.heartbeater = heartbeater; readLock = new TimedLock(this.lock.readLock(), identifier + " Read Lock", 100); writeLock = new TimedLock(this.lock.writeLock(), identifier + " Write Lock", 100); @@ -1137,6 +1140,9 @@ public final class StandardFlowFileQueue implements FlowFileQueue { logger.info("Successfully dropped {} FlowFiles ({} bytes) from Connection with ID {} on behalf of {}", dropRequest.getDroppedSize().getObjectCount(), dropRequest.getDroppedSize().getByteCount(), StandardFlowFileQueue.this.getIdentifier(), requestor); dropRequest.setState(DropFlowFileState.COMPLETE); + if (heartbeater != null) { + heartbeater.heartbeat(); + } } catch (final Exception e) { logger.error("Failed to drop FlowFiles from Connection with ID {} due to {}", StandardFlowFileQueue.this.getIdentifier(), e.toString()); logger.error("", e); http://git-wip-us.apache.org/repos/asf/nifi/blob/1e74357c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java index 8cfe146..32d8566 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java @@ -102,7 +102,7 @@ public class TestStandardFlowFileQueue { } }).when(provRepo).registerEvents(Mockito.any(Iterable.class)); - queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000); + queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000, null); TestFlowFile.idGenerator.set(0L); } http://git-wip-us.apache.org/repos/asf/nifi/blob/1e74357c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index 644018f..f8db35e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -139,7 +139,7 @@ public class TestStandardProcessSession { final ProcessScheduler processScheduler = Mockito.mock(ProcessScheduler.class); final FlowFileSwapManager swapManager = Mockito.mock(FlowFileSwapManager.class); - flowFileQueue = new StandardFlowFileQueue("1", connection, flowFileRepo, provenanceRepo, null, processScheduler, swapManager, null, 10000); + flowFileQueue = new StandardFlowFileQueue("1", connection, flowFileRepo, provenanceRepo, null, processScheduler, swapManager, null, 10000, null); when(connection.getFlowFileQueue()).thenReturn(flowFileQueue); Mockito.doAnswer(new Answer<Object>() {
