This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 3ea9fac NIFI-8040: When changing version of a flow, stop processors
that have a state of Starting in addition to those with a state of Running
3ea9fac is described below
commit 3ea9faccc692554d5c352562e462cc1a8e06c177
Author: Mark Payne <[email protected]>
AuthorDate: Wed Jan 26 10:03:55 2022 -0500
NIFI-8040: When changing version of a flow, stop processors that have a
state of Starting in addition to those with a state of Running
Signed-off-by: Pierre Villard <[email protected]>
This closes #5718.
---
.../org/apache/nifi/web/StandardNiFiServiceFacade.java | 3 ++-
.../org/apache/nifi/web/api/FlowUpdateResource.java | 18 ++++++++++++++++--
2 files changed, 18 insertions(+), 3 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 445c7ee..7d01272 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -5056,7 +5056,7 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
}
if (localComponent.getComponentType() ==
org.apache.nifi.flow.ComponentType.CONTROLLER_SERVICE) {
- final String serviceId =
((InstantiatedVersionedControllerService)
localComponent).getInstanceIdentifier();
+ final String serviceId =
localComponent.getInstanceIdentifier();
final ControllerServiceNode serviceNode =
controllerServiceDAO.getControllerService(serviceId);
final List<ControllerServiceNode> referencingServices =
serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class);
@@ -5171,6 +5171,7 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
dto.setId(connectable.getIdentifier());
dto.setReferenceType(connectable.getConnectableType().name());
dto.setState(connectable.getScheduledState().name());
+ dto.setName(connectable.getName());
final String groupId = connectable instanceof RemoteGroupPort ?
((RemoteGroupPort) connectable).getRemoteProcessGroup().getIdentifier() :
connectable.getProcessGroupIdentifier();
dto.setProcessGroupId(groupId);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java
index 8bfed3c..97a23f8 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java
@@ -299,6 +299,20 @@ public abstract class FlowUpdateResource<T extends
ProcessGroupDescriptorEntity,
return createUpdateRequestResponse(requestType, requestId, request,
false);
}
+ private boolean isActive(final AffectedComponentDTO affectedComponentDto) {
+ final String state = affectedComponentDto.getState();
+ if ("Running".equalsIgnoreCase(state) ||
"Starting".equalsIgnoreCase(state)) {
+ return true;
+ }
+
+ final Integer threadCount =
affectedComponentDto.getActiveThreadCount();
+ if (threadCount != null && threadCount > 0) {
+ return true;
+ }
+
+ return false;
+ }
+
/**
* Perform the specified flow update
*/
@@ -318,8 +332,8 @@ public abstract class FlowUpdateResource<T extends
ProcessGroupDescriptorEntity,
stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_OUTPUT_PORT);
final Set<AffectedComponentEntity> runningComponents =
affectedComponents.stream()
- .filter(dto ->
stoppableReferenceTypes.contains(dto.getComponent().getReferenceType()))
- .filter(dto ->
"Running".equalsIgnoreCase(dto.getComponent().getState()))
+ .filter(entity ->
stoppableReferenceTypes.contains(entity.getComponent().getReferenceType()))
+ .filter(entity -> isActive(entity.getComponent()))
.collect(Collectors.toSet());
logger.info("Stopping {} Processors", runningComponents.size());