This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new f04a3adc7e NIFI-14663: Fixed threading issue in which 
ProcessorNode.isRunning() can return false before shutdown lifecycle methods 
have been run Fixed bug in System Test, in which test was not waiting for 
Controller Service to become valid before starting Processor that references 
it. Also fixed a typo in NiFiClientUtil in which 
waitForControllerServicesEnabled was misspelled as 
waitForControllerSerivcesEnabled
f04a3adc7e is described below

commit f04a3adc7eb9fe2472c193686ebf4e9ba1fa24d6
Author: Mark Payne <[email protected]>
AuthorDate: Sat Jun 14 11:18:29 2025 -0400

    NIFI-14663: Fixed threading issue in which ProcessorNode.isRunning() can 
return false before shutdown lifecycle methods have been run
    Fixed bug in System Test, in which test was not waiting for Controller 
Service to become valid before starting Processor that references it. Also 
fixed a typo in NiFiClientUtil in which waitForControllerServicesEnabled was 
misspelled as waitForControllerSerivcesEnabled
---
 .../nifi/controller/StandardProcessorNode.java     |  6 ++++--
 .../org/apache/nifi/controller/ProcessorNode.java  |  3 ---
 .../apache/nifi/tests/system/NiFiClientUtil.java   | 24 +++++++++++-----------
 .../system/clustering/FlowSynchronizationIT.java   |  2 ++
 .../ControllerServiceApiValidationIT.java          | 12 +++++------
 .../ControllerServiceLifecycleIT.java              |  4 ++--
 .../DependentControllerServiceIT.java              |  6 +++---
 .../system/nar/NarProviderAndAutoLoaderIT.java     |  2 +-
 8 files changed, 30 insertions(+), 29 deletions(-)

diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index a1ac2a1205..c3a543bc49 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -918,8 +918,10 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
 
     @Override
     public boolean isRunning() {
-        final ScheduledState state = getScheduledState();
-        return state == ScheduledState.RUNNING || state == 
ScheduledState.STARTING || hasActiveThreads;
+        final ScheduledState state = getPhysicalScheduledState();
+        return state == ScheduledState.RUNNING || state == 
ScheduledState.STARTING
+               || state == ScheduledState.STOPPING || state == 
ScheduledState.RUN_ONCE
+               || hasActiveThreads;
     }
 
     @Override
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
index 7286ebd964..64e60c919f 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
@@ -140,9 +140,6 @@ public abstract class ProcessorNode extends 
AbstractComponentNode implements Con
 
     public abstract void verifyCanTerminate();
 
-    /**
-     *
-     */
     @Override
     public ScheduledState getScheduledState() {
         ScheduledState sc = this.scheduledState.get();
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
index 3ce5035413..849662a27e 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
@@ -926,13 +926,13 @@ public class NiFiClientUtil {
                 continue;
             }
 
-            if ("RUNNING".equals(expectedState)) {
-                return;
-            }
-
             final ProcessorStatusSnapshotDTO snapshotDto = 
entity.getStatus().getAggregateSnapshot();
-            if (snapshotDto.getActiveThreadCount() == 0 && 
snapshotDto.getTerminatedThreadCount() == 0) {
-                logger.info("Processor {} has reached desired state of {}", 
processorId, expectedState);
+            final Integer activeThreadCount = 
snapshotDto.getActiveThreadCount();
+            final Integer terminatedThreadCount = 
snapshotDto.getTerminatedThreadCount();
+
+            if ("RUNNING".equals(expectedState) || (activeThreadCount == 0 && 
terminatedThreadCount == 0)) {
+                logger.info("Processor {} is now in desired state of {} with 
{} active threads and {} terminated threads",
+                    processorId, expectedState, activeThreadCount, 
terminatedThreadCount);
                 return;
             }
 
@@ -997,7 +997,7 @@ public class NiFiClientUtil {
 
         final ActivateControllerServicesEntity activateControllerServices = 
nifiClient.getFlowClient().activateControllerServices(activateControllerServicesEntity);
         if (waitForEnabled) {
-            waitForControllerSerivcesEnabled(groupId);
+            waitForControllerServicesEnabled(groupId);
         }
 
         return activateControllerServices;
@@ -1198,7 +1198,7 @@ public class NiFiClientUtil {
         activateControllerServicesEntity.setDisconnectedNodeAcknowledged(true);
 
         final ActivateControllerServicesEntity activateControllerServices = 
nifiClient.getFlowClient().activateControllerServices(activateControllerServicesEntity);
-        waitForControllerSerivcesDisabled(groupId);
+        waitForControllerServicesDisabled(groupId);
 
         if (recurse) {
             final ProcessGroupFlowEntity groupEntity = 
nifiClient.getFlowClient().getProcessGroup(groupId);
@@ -1256,15 +1256,15 @@ public class NiFiClientUtil {
         }
     }
 
-    public void waitForControllerSerivcesDisabled(final String groupId, final 
String... serviceIdsOfInterest) throws NiFiClientException, IOException {
+    public void waitForControllerServicesDisabled(final String groupId, final 
String... serviceIdsOfInterest) throws NiFiClientException, IOException {
         waitForControllerServiceState(groupId, "DISABLED", 
Arrays.asList(serviceIdsOfInterest));
     }
 
-    public void waitForControllerSerivcesEnabled(final String groupId, final 
String... serviceIdsOfInterest) throws NiFiClientException, IOException {
+    public void waitForControllerServicesEnabled(final String groupId, final 
String... serviceIdsOfInterest) throws NiFiClientException, IOException {
         waitForControllerServiceState(groupId, "ENABLED", 
Arrays.asList(serviceIdsOfInterest));
     }
 
-    public void waitForControllerSerivcesEnabled(final String groupId, final 
List<String> serviceIdsOfInterest) throws NiFiClientException, IOException {
+    public void waitForControllerServicesEnabled(final String groupId, final 
List<String> serviceIdsOfInterest) throws NiFiClientException, IOException {
         waitForControllerServiceState(groupId, "ENABLED", 
serviceIdsOfInterest);
     }
 
@@ -1278,7 +1278,7 @@ public class NiFiClientUtil {
                 return;
             }
 
-            final ControllerServiceEntity entity = nonDisabledServices.get(0);
+            final ControllerServiceEntity entity = 
nonDisabledServices.getFirst();
             logger.info("Controller Service ID [{}] Type [{}] State [{}] 
waiting for State [{}]: sleeping for 500 ms before retrying", entity.getId(),
                     entity.getComponent().getType(), 
entity.getComponent().getState(), desiredState);
 
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
index fc3794b4ed..772685fb00 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
@@ -55,6 +55,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.zip.GZIPInputStream;
@@ -581,6 +582,7 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
         getClientUtil().enableControllerService(countC);
         getClientUtil().enableControllerService(countB);
         getClientUtil().enableControllerService(countA);
+        
getClientUtil().waitForControllerServicesEnabled(countC.getParentGroupId(), 
List.of(countC.getId(), countB.getId(), countA.getId()));
 
         getClientUtil().startProcessor(countFlowFiles);
 
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/controllerservice/ControllerServiceApiValidationIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/controllerservice/ControllerServiceApiValidationIT.java
index f72954939e..3a51e5aa8a 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/controllerservice/ControllerServiceApiValidationIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/controllerservice/ControllerServiceApiValidationIT.java
@@ -40,7 +40,7 @@ public class ControllerServiceApiValidationIT extends 
NiFiSystemIT {
         runStatusEntity.setState("ENABLED");
         runStatusEntity.setRevision(fakeServiceEntity.getRevision());
         
getNifiClient().getControllerServicesClient().activateControllerService(fakeServiceEntity.getId(),
 runStatusEntity);
-        getClientUtil().waitForControllerSerivcesEnabled("root");
+        getClientUtil().waitForControllerServicesEnabled("root");
         String controllerStatus = 
getNifiClient().getControllerServicesClient().getControllerService(fakeServiceEntity.getId()).getStatus().getRunStatus();
         String processorStatus = 
getNifiClient().getProcessorClient().getProcessor(fakeProcessorEntity.getId()).getStatus().getRunStatus();
 
@@ -58,7 +58,7 @@ public class ControllerServiceApiValidationIT extends 
NiFiSystemIT {
         runStatusEntity.setState("ENABLED");
         runStatusEntity.setRevision(fakeServiceEntity.getRevision());
         
getNifiClient().getControllerServicesClient().activateControllerService(fakeServiceEntity.getId(),
 runStatusEntity);
-        getClientUtil().waitForControllerSerivcesEnabled("root");
+        getClientUtil().waitForControllerServicesEnabled("root");
         String controllerStatus = 
getNifiClient().getControllerServicesClient().getControllerService(fakeServiceEntity.getId()).getStatus().getRunStatus();
         String processorStatus = 
getNifiClient().getProcessorClient().getProcessor(fakeProcessorEntity.getId()).getStatus().getRunStatus();
 
@@ -79,7 +79,7 @@ public class ControllerServiceApiValidationIT extends 
NiFiSystemIT {
         getClientUtil().updateProcessorProperties(processor, 
Collections.singletonMap("Fake Service", controllerService.getId()));
         getClientUtil().enableControllerService(controllerService);
 
-        getClientUtil().waitForControllerSerivcesEnabled("root");
+        getClientUtil().waitForControllerServicesEnabled("root");
 
         final String controllerStatus = 
getNifiClient().getControllerServicesClient().getControllerService(controllerService.getId()).getStatus().getRunStatus();
         assertEquals("ENABLED", controllerStatus);
@@ -103,7 +103,7 @@ public class ControllerServiceApiValidationIT extends 
NiFiSystemIT {
         runStatusEntity.setState("ENABLED");
         runStatusEntity.setRevision(controllerService.getRevision());
         
getNifiClient().getControllerServicesClient().activateControllerService(controllerService.getId(),
 runStatusEntity);
-        getClientUtil().waitForControllerSerivcesEnabled("root");
+        getClientUtil().waitForControllerServicesEnabled("root");
 
         final String controllerStatus = 
getNifiClient().getControllerServicesClient().getControllerService(controllerService.getId()).getStatus().getRunStatus();
         assertEquals("ENABLED", controllerStatus);
@@ -126,7 +126,7 @@ public class ControllerServiceApiValidationIT extends 
NiFiSystemIT {
         runStatusEntity.setState("ENABLED");
         runStatusEntity.setRevision(fakeServiceEntity.getRevision());
         
getNifiClient().getControllerServicesClient().activateControllerService(fakeServiceEntity.getId(),
 runStatusEntity);
-        getClientUtil().waitForControllerSerivcesEnabled("root");
+        getClientUtil().waitForControllerServicesEnabled("root");
         String controllerStatus = 
getNifiClient().getControllerServicesClient().getControllerService(fakeServiceEntity.getId()).getStatus().getRunStatus();
         String processorStatus = 
getNifiClient().getProcessorClient().getProcessor(fakeProcessorEntity.getId()).getStatus().getRunStatus();
 
@@ -144,7 +144,7 @@ public class ControllerServiceApiValidationIT extends 
NiFiSystemIT {
         runStatusEntity.setState("ENABLED");
         runStatusEntity.setRevision(fakeServiceEntity.getRevision());
         
getNifiClient().getControllerServicesClient().activateControllerService(fakeServiceEntity.getId(),
 runStatusEntity);
-        getClientUtil().waitForControllerSerivcesEnabled("root");
+        getClientUtil().waitForControllerServicesEnabled("root");
         String controllerStatus = 
getNifiClient().getControllerServicesClient().getControllerService(fakeServiceEntity.getId()).getStatus().getRunStatus();
         String processorStatus = 
getNifiClient().getProcessorClient().getProcessor(fakeProcessorEntity.getId()).getStatus().getRunStatus();
 
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/controllerservice/ControllerServiceLifecycleIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/controllerservice/ControllerServiceLifecycleIT.java
index 7a502193a6..7d3c262da2 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/controllerservice/ControllerServiceLifecycleIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/controllerservice/ControllerServiceLifecycleIT.java
@@ -42,7 +42,7 @@ public class ControllerServiceLifecycleIT extends 
NiFiSystemIT {
         }
 
         getClientUtil().enableControllerServices("root", false);
-        getClientUtil().waitForControllerSerivcesEnabled("root", 
countServiceIds);
+        getClientUtil().waitForControllerServicesEnabled("root", 
countServiceIds);
     }
 
     @Test
@@ -51,7 +51,7 @@ public class ControllerServiceLifecycleIT extends 
NiFiSystemIT {
         getClientUtil().updateControllerServiceProperties(service, 
Collections.singletonMap("Enable Failure Count", "1"));
 
         getClientUtil().enableControllerServices("root", false);
-        getClientUtil().waitForControllerSerivcesEnabled("root");
+        getClientUtil().waitForControllerServicesEnabled("root");
     }
 
 }
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/controllerservice/DependentControllerServiceIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/controllerservice/DependentControllerServiceIT.java
index 2a60b107bb..96df3a830b 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/controllerservice/DependentControllerServiceIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/controllerservice/DependentControllerServiceIT.java
@@ -63,18 +63,18 @@ public class DependentControllerServiceIT extends 
NiFiSystemIT {
         updateReferencesEntity.setState("ENABLED");
         
getNifiClient().getControllerServicesClient().updateControllerServiceReferences(updateReferencesEntity);
 
-        getClientUtil().waitForControllerSerivcesEnabled("root");
+        getClientUtil().waitForControllerServicesEnabled("root");
 
         // Disable the referencing services.
         updateReferencesEntity.setState("DISABLED");
         
getNifiClient().getControllerServicesClient().updateControllerServiceReferences(updateReferencesEntity);
-        getClientUtil().waitForControllerSerivcesDisabled("root", 
referencingService.getId());
+        getClientUtil().waitForControllerServicesDisabled("root", 
referencingService.getId());
 
         // Disable the Sleep On Validation Service.
         runStatusEntity.setState("DISABLED");
         
getNifiClient().getControllerServicesClient().activateControllerService(sleepOnValidation.getId(),
 runStatusEntity);
 
         // Wait for all services to become disabled.
-        getClientUtil().waitForControllerSerivcesDisabled("root");
+        getClientUtil().waitForControllerServicesDisabled("root");
     }
 }
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/nar/NarProviderAndAutoLoaderIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/nar/NarProviderAndAutoLoaderIT.java
index 523ed90c23..05ad540095 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/nar/NarProviderAndAutoLoaderIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/nar/NarProviderAndAutoLoaderIT.java
@@ -59,7 +59,7 @@ public class NarProviderAndAutoLoaderIT extends NiFiSystemIT {
                 ROOT_GROUP_ID, NIFI_GROUP_ID, 
CLASSLOADER_INFO_SERVICE_NAR_ARTIFACT, getNiFiVersion());
 
         getClientUtil().enableControllerService(classLoaderInfoService);
-        
getClientUtil().waitForControllerSerivcesEnabled(classLoaderInfoService.getParentGroupId(),
 classLoaderInfoService.getId());
+        
getClientUtil().waitForControllerServicesEnabled(classLoaderInfoService.getParentGroupId(),
 classLoaderInfoService.getId());
 
         final ProcessorEntity getClassLoaderInfo = 
getClientUtil().createProcessor(CLASSLOADER_INFO_PROCESSOR_CLASS_NAME,
                 NIFI_GROUP_ID, CLASSLOADER_INFO_PROCESSOR_NAR_ARTIFACT, 
getNiFiVersion());

Reply via email to