Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/3010#discussion_r219295502
  
    --- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
 ---
    @@ -662,6 +682,39 @@ private void handleReconnectionRequest(final 
ReconnectionRequestMessage request)
             }
         }
     
    +    private void handleDecommissionRequest(final DecommissionMessage 
request) throws InterruptedException {
    +        logger.info("Received decommission request message from manager 
with explanation: " + request.getExplanation());
    +        decommission(request.getExplanation());
    +    }
    +
    +    private void decommission(final String explanation) throws 
InterruptedException {
    +        writeLock.lock();
    +        try {
    +
    +            logger.info("Decommissioning node due to " + explanation);
    +
    +            // mark node as decommissioning
    +            controller.setConnectionStatus(new 
NodeConnectionStatus(nodeId, NodeConnectionState.DECOMMISSIONING, 
DecommissionCode.DECOMMISSIONED, explanation));
    +            // request to stop all processors on node
    +            controller.stopAllProcessors();
    --- End diff --
    
    In addition to calling stopAllProcessors() I think we should be terminating 
all active processors as well. We can do this by getting the Root Process Group 
from the FlowController, then from that getting all Processors and for each one 
if processor.getScheduledState() == ScheduledState.STOPPED (do this in case a 
processor is disabled), call ProcessGroup.terminateProcessor(ProcessorNode) and 
then recursing through all groups.
    
    This will ensure that even if a Processor has hold of a FlowFile and makes 
no progress we can still push the FlowFiles out to other nodes in the cluster 
in order to decommission.


---

Reply via email to