This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 3ea9fac  NIFI-8040: When changing version of a flow, stop processors 
that have a state of Starting in addition to those with a state of Running
3ea9fac is described below

commit 3ea9faccc692554d5c352562e462cc1a8e06c177
Author: Mark Payne <[email protected]>
AuthorDate: Wed Jan 26 10:03:55 2022 -0500

    NIFI-8040: When changing version of a flow, stop processors that have a 
state of Starting in addition to those with a state of Running
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #5718.
---
 .../org/apache/nifi/web/StandardNiFiServiceFacade.java |  3 ++-
 .../org/apache/nifi/web/api/FlowUpdateResource.java    | 18 ++++++++++++++++--
 2 files changed, 18 insertions(+), 3 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 445c7ee..7d01272 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -5056,7 +5056,7 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
             }
 
             if (localComponent.getComponentType() == 
org.apache.nifi.flow.ComponentType.CONTROLLER_SERVICE) {
-                final String serviceId = 
((InstantiatedVersionedControllerService) 
localComponent).getInstanceIdentifier();
+                final String serviceId = 
localComponent.getInstanceIdentifier();
                 final ControllerServiceNode serviceNode = 
controllerServiceDAO.getControllerService(serviceId);
 
                 final List<ControllerServiceNode> referencingServices = 
serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class);
@@ -5171,6 +5171,7 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
         dto.setId(connectable.getIdentifier());
         dto.setReferenceType(connectable.getConnectableType().name());
         dto.setState(connectable.getScheduledState().name());
+        dto.setName(connectable.getName());
 
         final String groupId = connectable instanceof RemoteGroupPort ? 
((RemoteGroupPort) connectable).getRemoteProcessGroup().getIdentifier() : 
connectable.getProcessGroupIdentifier();
         dto.setProcessGroupId(groupId);
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java
index 8bfed3c..97a23f8 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java
@@ -299,6 +299,20 @@ public abstract class FlowUpdateResource<T extends 
ProcessGroupDescriptorEntity,
         return createUpdateRequestResponse(requestType, requestId, request, 
false);
     }
 
+    private boolean isActive(final AffectedComponentDTO affectedComponentDto) {
+        final String state = affectedComponentDto.getState();
+        if ("Running".equalsIgnoreCase(state) || 
"Starting".equalsIgnoreCase(state)) {
+            return true;
+        }
+
+        final Integer threadCount = 
affectedComponentDto.getActiveThreadCount();
+        if (threadCount != null && threadCount > 0) {
+            return true;
+        }
+
+        return false;
+    }
+
     /**
      * Perform the specified flow update
      */
@@ -318,8 +332,8 @@ public abstract class FlowUpdateResource<T extends 
ProcessGroupDescriptorEntity,
         
stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_OUTPUT_PORT);
 
         final Set<AffectedComponentEntity> runningComponents = 
affectedComponents.stream()
-                .filter(dto -> 
stoppableReferenceTypes.contains(dto.getComponent().getReferenceType()))
-                .filter(dto -> 
"Running".equalsIgnoreCase(dto.getComponent().getState()))
+                .filter(entity -> 
stoppableReferenceTypes.contains(entity.getComponent().getReferenceType()))
+                .filter(entity -> isActive(entity.getComponent()))
                 .collect(Collectors.toSet());
 
         logger.info("Stopping {} Processors", runningComponents.size());

Reply via email to