This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 105a76b NIFI-8314: Add controller-level bulletin message for
long-running tasks.
105a76b is described below
commit 105a76b7b7d665335d53111817867e9ab53b957c
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Wed Mar 17 13:48:56 2021 +0100
NIFI-8314: Add controller-level bulletin message for long-running tasks.
Signed-off-by: Pierre Villard <[email protected]>
This closes #4906.
---
.../components/monitor/LongRunningTaskMonitor.java | 20 ++++++++++++++------
.../org/apache/nifi/controller/FlowController.java | 3 ++-
2 files changed, 16 insertions(+), 7 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/monitor/LongRunningTaskMonitor.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/monitor/LongRunningTaskMonitor.java
index eaf3526..27e4886 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/monitor/LongRunningTaskMonitor.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/monitor/LongRunningTaskMonitor.java
@@ -20,6 +20,8 @@ import org.apache.nifi.controller.ActiveThreadInfo;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ThreadDetails;
import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.reporting.Severity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,10 +32,12 @@ public class LongRunningTaskMonitor implements Runnable {
private static final Logger LOGGER =
LoggerFactory.getLogger(LongRunningTaskMonitor.class);
private final FlowManager flowManager;
+ private final EventReporter eventReporter;
private final long thresholdMillis;
- public LongRunningTaskMonitor(FlowManager flowManager, long
thresholdMillis) {
+ public LongRunningTaskMonitor(FlowManager flowManager, EventReporter
eventReporter, long thresholdMillis) {
this.flowManager = flowManager;
+ this.eventReporter = eventReporter;
this.thresholdMillis = thresholdMillis;
}
@@ -54,12 +58,16 @@ public class LongRunningTaskMonitor implements Runnable {
if (activeThread.getActiveMillis() > thresholdMillis) {
longRunningThreadCount++;
- LOGGER.warn(String.format("Long running task detected on
processor [id=%s, type=%s, name=%s]. Thread name: %s; Active time: %,d; Stack
trace:\n%s",
- processorNode.getIdentifier(),
processorNode.getComponentType(), processorNode.getName(),
- activeThread.getThreadName(),
activeThread.getActiveMillis(), activeThread.getStackTrace()));
+ String taskSeconds = String.format("%,d seconds",
activeThread.getActiveMillis() / 1000);
- processorNode.getLogger().warn(String.format("Long running
task detected on the processor [thread name: %s; active time: %,d].",
- activeThread.getThreadName(),
activeThread.getActiveMillis()));
+ LOGGER.warn(String.format("Long running task detected on
processor [id=%s, name=%s, type=%s]. Task time: %s. Stack trace:\n%s",
+ processorNode.getIdentifier(),
processorNode.getName(), processorNode.getComponentType(), taskSeconds,
activeThread.getStackTrace()));
+
+ eventReporter.reportEvent(Severity.WARNING, "Long Running
Task", String.format("Processor with ID %s, Name %s and Type %s has a task that
has been running for %s " +
+ "(thread name: %s).",
processorNode.getIdentifier(), processorNode.getName(),
processorNode.getComponentType(), taskSeconds, activeThread.getThreadName()));
+
+ processorNode.getLogger().warn(String.format("The
processor has a task that has been running for %s (thread name: %s).",
+ taskSeconds, activeThread.getThreadName()));
}
}
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index d724550..46610ab 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -1107,7 +1107,8 @@ public class FlowController implements
ReportingTaskProvider, Authorizable, Node
final long scheduleMillis =
parseDurationPropertyToMillis(NiFiProperties.MONITOR_LONG_RUNNING_TASK_SCHEDULE,
NiFiProperties.DEFAULT_MONITOR_LONG_RUNNING_TASK_SCHEDULE);
final long thresholdMillis =
parseDurationPropertyToMillis(NiFiProperties.MONITOR_LONG_RUNNING_TASK_THRESHOLD,
NiFiProperties.DEFAULT_MONITOR_LONG_RUNNING_TASK_THRESHOLD);
- longRunningTaskMonitorThreadPool.scheduleWithFixedDelay(new
LongRunningTaskMonitor(getFlowManager(), thresholdMillis), scheduleMillis,
scheduleMillis, TimeUnit.MILLISECONDS);
+ LongRunningTaskMonitor longRunningTaskMonitor = new
LongRunningTaskMonitor(getFlowManager(), createEventReporter(),
thresholdMillis);
+
longRunningTaskMonitorThreadPool.scheduleWithFixedDelay(longRunningTaskMonitor,
scheduleMillis, scheduleMillis, TimeUnit.MILLISECONDS);
}
private long parseDurationPropertyToMillis(String propertyName, String
defaultValue) {