Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/3010#discussion_r219295502
--- Diff:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
---
@@ -662,6 +682,39 @@ private void handleReconnectionRequest(final
ReconnectionRequestMessage request)
}
}
+ private void handleDecommissionRequest(final DecommissionMessage
request) throws InterruptedException {
+ logger.info("Received decommission request message from manager
with explanation: " + request.getExplanation());
+ decommission(request.getExplanation());
+ }
+
+ private void decommission(final String explanation) throws
InterruptedException {
+ writeLock.lock();
+ try {
+
+ logger.info("Decommissioning node due to " + explanation);
+
+ // mark node as decommissioning
+ controller.setConnectionStatus(new
NodeConnectionStatus(nodeId, NodeConnectionState.DECOMMISSIONING,
DecommissionCode.DECOMMISSIONED, explanation));
+ // request to stop all processors on node
+ controller.stopAllProcessors();
--- End diff --
In addition to calling stopAllProcessors() I think we should be terminating
all active processors as well. We can do this by getting the Root Process Group
from the FlowController, then from that getting all Processors and for each one
if processor.getScheduledState() == ScheduledState.STOPPED (do this in case a
processor is disabled), call ProcessGroup.terminateProcessor(ProcessorNode) and
then recursing through all groups.
This will ensure that even if a Processor has hold of a FlowFile and makes
no progress we can still push the FlowFiles out to other nodes in the cluster
in order to decommission.
---