This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new f00f0ad  NIFI-8314: Generate warning for long-running processor tasks
f00f0ad is described below

commit f00f0ad269c2fab8ce7b88fb6226fdf88ffce33a
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Tue Mar 16 17:56:50 2021 +0100

    NIFI-8314: Generate warning for long-running processor tasks
---
 .../java/org/apache/nifi/util/NiFiProperties.java  |  8 +++
 .../components/monitor/LongRunningTaskMonitor.java | 69 ++++++++++++++++++++++
 .../org/apache/nifi/controller/FlowController.java | 25 ++++++++
 .../nifi-framework/nifi-resources/pom.xml          |  4 ++
 .../src/main/resources/conf/nifi.properties        |  4 ++
 5 files changed, 110 insertions(+)

diff --git 
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
 
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index b33b6f7..1bdf495 100644
--- 
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++ 
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -288,6 +288,10 @@ public abstract class NiFiProperties {
     public static final String ANALYTICS_CONNECTION_MODEL_SCORE_NAME = 
"nifi.analytics.connection.model.score.name";
     public static final String ANALYTICS_CONNECTION_MODEL_SCORE_THRESHOLD = 
"nifi.analytics.connection.model.score.threshold";
 
+    // runtime monitoring properties
+    public static final String MONITOR_LONG_RUNNING_TASK_SCHEDULE = 
"nifi.monitor.long.running.task.schedule";
+    public static final String MONITOR_LONG_RUNNING_TASK_THRESHOLD = 
"nifi.monitor.long.running.task.threshold";
+
     // defaults
     public static final Boolean DEFAULT_AUTO_RESUME_STATE = true;
     public static final String DEFAULT_AUTHORIZER_CONFIGURATION_FILE = 
"conf/authorizers.xml";
@@ -371,6 +375,10 @@ public abstract class NiFiProperties {
     public static final String DEFAULT_ANALYTICS_CONNECTION_SCORE_NAME = 
"rSquared";
     public static final double DEFAULT_ANALYTICS_CONNECTION_SCORE_THRESHOLD = 
.90;
 
+    // runtime monitoring defaults
+    public static final String DEFAULT_MONITOR_LONG_RUNNING_TASK_SCHEDULE = "1 
min";
+    public static final String DEFAULT_MONITOR_LONG_RUNNING_TASK_THRESHOLD = 
"5 mins";
+
     // Status repository defaults
     public static final int 
DEFAULT_COMPONENT_STATUS_REPOSITORY_PERSIST_NODE_DAYS = 14;
     public static final int 
DEFAULT_COMPONENT_STATUS_REPOSITORY_PERSIST_COMPONENT_DAYS = 3;
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/monitor/LongRunningTaskMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/monitor/LongRunningTaskMonitor.java
new file mode 100644
index 0000000..eaf3526
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/monitor/LongRunningTaskMonitor.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.components.monitor;
+
+import org.apache.nifi.controller.ActiveThreadInfo;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ThreadDetails;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class LongRunningTaskMonitor implements Runnable {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LongRunningTaskMonitor.class);
+
+    private final FlowManager flowManager;
+    private final long thresholdMillis;
+
+    public LongRunningTaskMonitor(FlowManager flowManager, long 
thresholdMillis) {
+        this.flowManager = flowManager;
+        this.thresholdMillis = thresholdMillis;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.debug("Checking long running processor tasks...");
+
+        int activeThreadCount = 0;
+        int longRunningThreadCount = 0;
+
+        ThreadDetails threadDetails = ThreadDetails.capture();
+
+        for (ProcessorNode processorNode : 
flowManager.getRootGroup().findAllProcessors()) {
+            List<ActiveThreadInfo> activeThreads = 
processorNode.getActiveThreads(threadDetails);
+            activeThreadCount += activeThreads.size();
+
+            for (ActiveThreadInfo activeThread : activeThreads) {
+                if (activeThread.getActiveMillis() > thresholdMillis) {
+                    longRunningThreadCount++;
+
+                    LOGGER.warn(String.format("Long running task detected on 
processor [id=%s, type=%s, name=%s]. Thread name: %s; Active time: %,d; Stack 
trace:\n%s",
+                            processorNode.getIdentifier(), 
processorNode.getComponentType(), processorNode.getName(),
+                            activeThread.getThreadName(), 
activeThread.getActiveMillis(), activeThread.getStackTrace()));
+
+                    processorNode.getLogger().warn(String.format("Long running 
task detected on the processor [thread name: %s; active time: %,d].",
+                            activeThread.getThreadName(), 
activeThread.getActiveMillis()));
+                }
+            }
+        }
+
+        LOGGER.info("Active threads: {}; Long running threads: {}", 
activeThreadCount, longRunningThreadCount);
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 16d3347..d724550 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -40,6 +40,7 @@ import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.NodeProtocolSender;
 import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.nifi.components.monitor.LongRunningTaskMonitor;
 import org.apache.nifi.components.state.StateManagerProvider;
 import org.apache.nifi.components.validation.StandardValidationTrigger;
 import org.apache.nifi.components.validation.TriggerValidationTask;
@@ -310,6 +311,7 @@ public class FlowController implements 
ReportingTaskProvider, Authorizable, Node
     private final StandardFlowManager flowManager;
     private final RepositoryContextFactory repositoryContextFactory;
     private final RingBufferGarbageCollectionLog gcLog;
+    private final FlowEngine longRunningTaskMonitorThreadPool;
 
     /**
      * true if controller is configured to operate in a clustered environment
@@ -778,6 +780,8 @@ public class FlowController implements 
ReportingTaskProvider, Authorizable, Node
             loadBalanceServer = null;
             loadBalanceClientThreadPool = null;
         }
+
+        longRunningTaskMonitorThreadPool = new FlowEngine(1, "Long Running 
Task Monitor", true);
     }
 
     @Override
@@ -1092,11 +1096,32 @@ public class FlowController implements 
ReportingTaskProvider, Authorizable, Node
             for (final Connection connection : 
flowManager.findAllConnections()) {
                 connection.getFlowFileQueue().startLoadBalancing();
             }
+
+            scheduleLongRunningTaskMonitor();
         } finally {
             writeLock.unlock("onFlowInitialized");
         }
     }
 
+    private void scheduleLongRunningTaskMonitor() {
+        final long scheduleMillis = 
parseDurationPropertyToMillis(NiFiProperties.MONITOR_LONG_RUNNING_TASK_SCHEDULE,
 NiFiProperties.DEFAULT_MONITOR_LONG_RUNNING_TASK_SCHEDULE);
+        final long thresholdMillis = 
parseDurationPropertyToMillis(NiFiProperties.MONITOR_LONG_RUNNING_TASK_THRESHOLD,
 NiFiProperties.DEFAULT_MONITOR_LONG_RUNNING_TASK_THRESHOLD);
+
+        longRunningTaskMonitorThreadPool.scheduleWithFixedDelay(new 
LongRunningTaskMonitor(getFlowManager(), thresholdMillis), scheduleMillis, 
scheduleMillis, TimeUnit.MILLISECONDS);
+    }
+
+    private long parseDurationPropertyToMillis(String propertyName, String 
defaultValue) {
+        long durationMillis;
+        try {
+            final String duration = nifiProperties.getProperty(propertyName);
+            durationMillis = (long) 
FormatUtils.getPreciseTimeDuration(duration, TimeUnit.MILLISECONDS);
+        } catch (final Exception e) {
+            LOG.warn("Could not retrieve value for {}. This property has been 
set to '{}'", propertyName, defaultValue);
+            durationMillis = (long) 
FormatUtils.getPreciseTimeDuration(defaultValue, TimeUnit.MILLISECONDS);
+        }
+        return durationMillis;
+    }
+
     public boolean isStartAfterInitialization(final Connectable component) {
         return startConnectablesAfterInitialization.contains(component) || 
startRemoteGroupPortsAfterInitialization.contains(component);
     }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
index 216c2cd..4ebbb31 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
@@ -251,6 +251,10 @@
         
<nifi.analytics.connection.model.implementation>org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares</nifi.analytics.connection.model.implementation>
         
<nifi.analytics.connection.model.score.name>rSquared</nifi.analytics.connection.model.score.name>
         
<nifi.analytics.connection.model.score.threshold>.90</nifi.analytics.connection.model.score.threshold>
+
+        <!-- nifi.properties: runtime monitoring properties -->
+        <nifi.monitor.long.running.task.schedule>1 
min</nifi.monitor.long.running.task.schedule>
+        <nifi.monitor.long.running.task.threshold>5 
mins</nifi.monitor.long.running.task.threshold>
     </properties>
     <build>
         <plugins>
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
index ecf271e..c28e5c9 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
@@ -311,3 +311,7 @@ 
nifi.analytics.query.interval=${nifi.analytics.query.interval}
 
nifi.analytics.connection.model.implementation=${nifi.analytics.connection.model.implementation}
 
nifi.analytics.connection.model.score.name=${nifi.analytics.connection.model.score.name}
 
nifi.analytics.connection.model.score.threshold=${nifi.analytics.connection.model.score.threshold}
+
+# runtime monitoring properties
+nifi.monitor.long.running.task.schedule=${nifi.monitor.long.running.task.schedule}
+nifi.monitor.long.running.task.threshold=${nifi.monitor.long.running.task.threshold}

Reply via email to