This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 105a76b  NIFI-8314: Add controller-level bulletin message for 
long-running tasks.
105a76b is described below

commit 105a76b7b7d665335d53111817867e9ab53b957c
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Wed Mar 17 13:48:56 2021 +0100

    NIFI-8314: Add controller-level bulletin message for long-running tasks.
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #4906.
---
 .../components/monitor/LongRunningTaskMonitor.java   | 20 ++++++++++++++------
 .../org/apache/nifi/controller/FlowController.java   |  3 ++-
 2 files changed, 16 insertions(+), 7 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/monitor/LongRunningTaskMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/monitor/LongRunningTaskMonitor.java
index eaf3526..27e4886 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/monitor/LongRunningTaskMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/monitor/LongRunningTaskMonitor.java
@@ -20,6 +20,8 @@ import org.apache.nifi.controller.ActiveThreadInfo;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ThreadDetails;
 import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.reporting.Severity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,10 +32,12 @@ public class LongRunningTaskMonitor implements Runnable {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LongRunningTaskMonitor.class);
 
     private final FlowManager flowManager;
+    private final EventReporter eventReporter;
     private final long thresholdMillis;
 
-    public LongRunningTaskMonitor(FlowManager flowManager, long 
thresholdMillis) {
+    public LongRunningTaskMonitor(FlowManager flowManager, EventReporter 
eventReporter, long thresholdMillis) {
         this.flowManager = flowManager;
+        this.eventReporter = eventReporter;
         this.thresholdMillis = thresholdMillis;
     }
 
@@ -54,12 +58,16 @@ public class LongRunningTaskMonitor implements Runnable {
                 if (activeThread.getActiveMillis() > thresholdMillis) {
                     longRunningThreadCount++;
 
-                    LOGGER.warn(String.format("Long running task detected on 
processor [id=%s, type=%s, name=%s]. Thread name: %s; Active time: %,d; Stack 
trace:\n%s",
-                            processorNode.getIdentifier(), 
processorNode.getComponentType(), processorNode.getName(),
-                            activeThread.getThreadName(), 
activeThread.getActiveMillis(), activeThread.getStackTrace()));
+                    String taskSeconds = String.format("%,d seconds", 
activeThread.getActiveMillis() / 1000);
 
-                    processorNode.getLogger().warn(String.format("Long running 
task detected on the processor [thread name: %s; active time: %,d].",
-                            activeThread.getThreadName(), 
activeThread.getActiveMillis()));
+                    LOGGER.warn(String.format("Long running task detected on 
processor [id=%s, name=%s, type=%s]. Task time: %s. Stack trace:\n%s",
+                            processorNode.getIdentifier(), 
processorNode.getName(), processorNode.getComponentType(), taskSeconds, 
activeThread.getStackTrace()));
+
+                    eventReporter.reportEvent(Severity.WARNING, "Long Running 
Task", String.format("Processor with ID %s, Name %s and Type %s has a task that 
has been running for %s " +
+                            "(thread name: %s).", 
processorNode.getIdentifier(), processorNode.getName(), 
processorNode.getComponentType(), taskSeconds, activeThread.getThreadName()));
+
+                    processorNode.getLogger().warn(String.format("The 
processor has a task that has been running for %s (thread name: %s).",
+                            taskSeconds, activeThread.getThreadName()));
                 }
             }
         }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index d724550..46610ab 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -1107,7 +1107,8 @@ public class FlowController implements 
ReportingTaskProvider, Authorizable, Node
         final long scheduleMillis = 
parseDurationPropertyToMillis(NiFiProperties.MONITOR_LONG_RUNNING_TASK_SCHEDULE,
 NiFiProperties.DEFAULT_MONITOR_LONG_RUNNING_TASK_SCHEDULE);
         final long thresholdMillis = 
parseDurationPropertyToMillis(NiFiProperties.MONITOR_LONG_RUNNING_TASK_THRESHOLD,
 NiFiProperties.DEFAULT_MONITOR_LONG_RUNNING_TASK_THRESHOLD);
 
-        longRunningTaskMonitorThreadPool.scheduleWithFixedDelay(new 
LongRunningTaskMonitor(getFlowManager(), thresholdMillis), scheduleMillis, 
scheduleMillis, TimeUnit.MILLISECONDS);
+        LongRunningTaskMonitor longRunningTaskMonitor = new 
LongRunningTaskMonitor(getFlowManager(), createEventReporter(), 
thresholdMillis);
+        
longRunningTaskMonitorThreadPool.scheduleWithFixedDelay(longRunningTaskMonitor, 
scheduleMillis, scheduleMillis, TimeUnit.MILLISECONDS);
     }
 
     private long parseDurationPropertyToMillis(String propertyName, String 
defaultValue) {

Reply via email to