This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new d664708dd3 NIFI-10049: When unscheduling reporting task, increment its
concurrent task count until we've finished all shutdown logic and then
decrement it, in much the same way that we do for processors
d664708dd3 is described below
commit d664708dd336b7e133a39f69a790aa40f087d347
Author: Mark Payne <[email protected]>
AuthorDate: Tue May 24 13:25:37 2022 -0400
NIFI-10049: When unscheduling reporting task, increment its concurrent task
count until we've finished all shutdown logic and then decrement it, in much
the same way that we do for processors
This closes #6076
Signed-off-by: David Handermann <[email protected]>
---
.../nifi/controller/scheduling/StandardProcessScheduler.java | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index d5d136b176..4be5820069 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -276,6 +276,11 @@ public final class StandardProcessScheduler implements
ProcessScheduler {
}
taskNode.verifyCanStop();
+
+ // Increment the Active Thread Count in order to ensure that we don't
consider the Reporting Task completely stopped until we've run
+ // all lifecycle methods, such as @OnStopped
+ lifecycleState.incrementActiveThreadCount(null);
+
final SchedulingAgent agent =
getSchedulingAgent(taskNode.getSchedulingStrategy());
final ReportingTask reportingTask = taskNode.getReportingTask();
taskNode.setScheduledState(ScheduledState.STOPPED);
@@ -304,10 +309,14 @@ public final class StandardProcessScheduler implements
ProcessScheduler {
agent.unschedule(taskNode, lifecycleState);
- if (lifecycleState.getActiveThreadCount() == 0 &&
lifecycleState.mustCallOnStoppedMethods()) {
+ // If active thread count == 1, that indicates that all
execution threads have completed. We use 1 here instead of 0 because
+ // when the Reporting Task is unscheduled, we immediately
increment the thread count to 1 as an indicator that we've not completely
finished.
+ if (lifecycleState.getActiveThreadCount() == 1 &&
lifecycleState.mustCallOnStoppedMethods()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class,
reportingTask, configurationContext);
future.complete(null);
}
+
+ lifecycleState.decrementActiveThreadCount();
}
}
};