This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new d664708dd3 NIFI-10049: When unscheduling reporting task, increment its 
concurrent task count until we've finished all shutdown logic and then 
decrement it, in much the same way that we do for processors
d664708dd3 is described below

commit d664708dd336b7e133a39f69a790aa40f087d347
Author: Mark Payne <[email protected]>
AuthorDate: Tue May 24 13:25:37 2022 -0400

    NIFI-10049: When unscheduling reporting task, increment its concurrent task 
count until we've finished all shutdown logic and then decrement it, in much 
the same way that we do for processors
    
    This closes #6076
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../nifi/controller/scheduling/StandardProcessScheduler.java  | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index d5d136b176..4be5820069 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -276,6 +276,11 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
         }
 
         taskNode.verifyCanStop();
+
+        // Increment the Active Thread Count in order to ensure that we don't 
consider the Reporting Task completely stopped until we've run
+        // all lifecycle methods, such as @OnStopped
+        lifecycleState.incrementActiveThreadCount(null);
+
         final SchedulingAgent agent = 
getSchedulingAgent(taskNode.getSchedulingStrategy());
         final ReportingTask reportingTask = taskNode.getReportingTask();
         taskNode.setScheduledState(ScheduledState.STOPPED);
@@ -304,10 +309,14 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
 
                     agent.unschedule(taskNode, lifecycleState);
 
-                    if (lifecycleState.getActiveThreadCount() == 0 && 
lifecycleState.mustCallOnStoppedMethods()) {
+                    // If active thread count == 1, that indicates that all 
execution threads have completed. We use 1 here instead of 0 because
+                    // when the Reporting Task is unscheduled, we immediately 
increment the thread count to 1 as an indicator that we've not completely 
finished.
+                    if (lifecycleState.getActiveThreadCount() == 1 && 
lifecycleState.mustCallOnStoppedMethods()) {
                         
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, 
reportingTask, configurationContext);
                         future.complete(null);
                     }
+
+                    lifecycleState.decrementActiveThreadCount();
                 }
             }
         };

Reply via email to