This is an automated email from the ASF dual-hosted git repository.

tpalfy pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 1b4780c80d NIFI-13829: Mitigate false positive reports of 
MonitorActivity, in case of infrequent Flow Files
1b4780c80d is described below

commit 1b4780c80d0a49e9cca991f1823ae009abd93fa2
Author: Rajmund Takacs <[email protected]>
AuthorDate: Wed Oct 2 14:41:53 2024 +0200

    NIFI-13829: Mitigate false positive reports of MonitorActivity, in case of 
infrequent Flow Files
    
    This closes #9333.
    
    Signed-off-by: Tamas Palfy <[email protected]>
    (cherry picked from commit 7da06aab6296b6d32d0133dbbdd85c489a5709f1)
---
 .../nifi/processors/standard/MonitorActivity.java  | 38 ++++++++++++++--------
 .../processors/standard/TestMonitorActivity.java   | 38 ++++++++++++++++++++++
 2 files changed, 63 insertions(+), 13 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
index 43efc3f82b..975e8f2b4d 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
@@ -185,7 +185,7 @@ public class MonitorActivity extends AbstractProcessor {
 
     private final AtomicBoolean connectedWhenLastTriggered = new 
AtomicBoolean(false);
     private final AtomicLong lastInactiveMessage = new AtomicLong();
-    private final AtomicLong inactivityStartMillis = new 
AtomicLong(System.currentTimeMillis());
+    private final AtomicLong inactivityStartMillis = new 
AtomicLong(nowMillis());
     private final AtomicBoolean wasActive = new AtomicBoolean(true);
 
     private volatile LocalFlowActivityInfo localFlowActivityInfo;
@@ -303,7 +303,7 @@ public class MonitorActivity extends AbstractProcessor {
         final boolean isActive = localFlowActivityInfo.isActive() || 
!flowFiles.isEmpty();
         final long lastActivity = localFlowActivityInfo.getLastActivity();
         final long inactivityStartMillis = this.inactivityStartMillis.get();
-        final boolean timeToRepeatInactiveMessage = (lastInactiveMessage.get() 
+ thresholdMillis) <= System.currentTimeMillis();
+        final boolean timeToRepeatInactiveMessage = (lastInactiveMessage.get() 
+ thresholdMillis) <= nowMillis();
 
         final boolean canReport = !isClusterScope || isConnectedToCluster || 
!flowFiles.isEmpty();
         final boolean canChangeState = !waitForActivity || 
localFlowActivityInfo.hasSuccessfulTransfer();
@@ -323,10 +323,14 @@ public class MonitorActivity extends AbstractProcessor {
         }
     }
 
-    protected long getStartupTime() {
+    protected long nowMillis() {
         return System.currentTimeMillis();
     }
 
+    protected long getStartupTime() {
+        return nowMillis();
+    }
+
     protected final long getLastSuccessfulTransfer() {
         return localFlowActivityInfo.getLastSuccessfulTransfer();
     }
@@ -371,7 +375,7 @@ public class MonitorActivity extends AbstractProcessor {
         if (shouldThisNodeReport) {
             sendInactivityMarker(context, session, lastActivity, logger);
         }
-        lastInactiveMessage.set(System.currentTimeMillis());
+        lastInactiveMessage.set(nowMillis());
         setInactivityFlag(context.getStateManager());
     }
 
@@ -460,7 +464,7 @@ public class MonitorActivity extends AbstractProcessor {
         inactiveFlowFile = session.putAttribute(
                 inactiveFlowFile,
                 "inactivityDurationMillis",
-                String.valueOf(System.currentTimeMillis() - 
inactivityStartMillis)
+                String.valueOf(nowMillis() - inactivityStartMillis)
         );
 
         final byte[] outBytes = 
context.getProperty(INACTIVITY_MESSAGE).evaluateAttributeExpressions(inactiveFlowFile).getValue().getBytes(
@@ -481,7 +485,8 @@ public class MonitorActivity extends AbstractProcessor {
 
         activityRestoredFlowFile = 
session.putAttribute(activityRestoredFlowFile, "inactivityStartMillis", 
String.valueOf(
                 inactivityStartMillis));
-        activityRestoredFlowFile = 
session.putAttribute(activityRestoredFlowFile, "inactivityDurationMillis", 
String.valueOf(System.currentTimeMillis() - inactivityStartMillis));
+        activityRestoredFlowFile = 
session.putAttribute(activityRestoredFlowFile, "inactivityDurationMillis", 
String.valueOf(
+                nowMillis() - inactivityStartMillis));
 
         final byte[] outBytes = 
context.getProperty(ACTIVITY_RESTORED_MESSAGE).evaluateAttributeExpressions(activityRestoredFlowFile).getValue().getBytes(
                 StandardCharsets.UTF_8);
@@ -492,12 +497,14 @@ public class MonitorActivity extends AbstractProcessor {
         logger.info("Transferred {} to 'activity.restored'", 
activityRestoredFlowFile);
     }
 
-    private static class LocalFlowActivityInfo {
+    private class LocalFlowActivityInfo {
         private static final long NO_VALUE = 0;
+        private static final int TIMES_SYNC_WITHIN_THRESHOLD = 3;
 
         private final long startupTimeMillis;
         private final long thresholdMillis;
         private final boolean saveAttributes;
+        private final long syncPeriodMillis;
 
         private long nextSyncMillis = NO_VALUE;
         private long lastSuccessfulTransfer = NO_VALUE;
@@ -507,6 +514,7 @@ public class MonitorActivity extends AbstractProcessor {
             this.startupTimeMillis = startupTimeMillis;
             this.thresholdMillis = thresholdMillis;
             this.saveAttributes = saveAttributes;
+            this.syncPeriodMillis = thresholdMillis / 
TIMES_SYNC_WITHIN_THRESHOLD;
         }
 
         public LocalFlowActivityInfo(long startupTimeMillis, long 
thresholdMillis, boolean saveAttributes, long initialLastSuccessfulTransfer) {
@@ -515,22 +523,22 @@ public class MonitorActivity extends AbstractProcessor {
         }
 
         public boolean syncNeeded() {
-            return nextSyncMillis <= System.currentTimeMillis();
+            return nextSyncMillis <= nowMillis();
         }
 
         public void setNextSyncMillis() {
-            nextSyncMillis = System.currentTimeMillis() + (thresholdMillis / 
3);
+            nextSyncMillis = nowMillis() + syncPeriodMillis;
         }
 
         public void forceSync() {
-            nextSyncMillis = System.currentTimeMillis();
+            nextSyncMillis = nowMillis();
         }
 
         public boolean isActive() {
             if (hasSuccessfulTransfer()) {
-                return System.currentTimeMillis() < (lastSuccessfulTransfer + 
thresholdMillis);
+                return nowMillis() < (lastSuccessfulTransfer + 
thresholdMillis);
             } else {
-                return System.currentTimeMillis() < (startupTimeMillis + 
thresholdMillis);
+                return nowMillis() < (startupTimeMillis + thresholdMillis);
             }
         }
 
@@ -555,7 +563,11 @@ public class MonitorActivity extends AbstractProcessor {
         }
 
         public void update(FlowFile flowFile) {
-            this.lastSuccessfulTransfer = System.currentTimeMillis();
+            final long now = nowMillis();
+            if ((now - this.getLastActivity()) > syncPeriodMillis) {
+                this.forceSync(); // Immediate synchronization if Flow Files 
are infrequent, to mitigate false reports
+            }
+            this.lastSuccessfulTransfer = now;
             if (saveAttributes) {
                 lastSuccessfulTransferAttributes = new 
HashMap<>(flowFile.getAttributes());
                 
lastSuccessfulTransferAttributes.remove(CoreAttributes.UUID.key());
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
index da5c288005..387f71b5bf 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
@@ -30,6 +30,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Pattern;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateManager;
@@ -1375,4 +1376,41 @@ public class TestMonitorActivity {
         runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 3);
         runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
     }
+
+    @Test
+    public void testInfrequentFlowFilesTriggerImmediateSynchronization() 
throws IOException, InterruptedException {
+        final long threshold_seconds = 30;
+        final long startup_time_seconds = 1;
+        final AtomicLong nowProvider = new 
AtomicLong(TimeUnit.SECONDS.toMillis(startup_time_seconds));
+        final TestRunner runner = TestRunners.newTestRunner(new 
MonitorActivity() {
+            @Override
+            protected long nowMillis() {
+                return nowProvider.get();
+            }
+        });
+        runner.setIsConfiguredForClustering(true);
+        runner.setConnected(true);
+        runner.setPrimaryNode(false);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, 
MonitorActivity.SCOPE_CLUSTER);
+        runner.setProperty(MonitorActivity.THRESHOLD, threshold_seconds + " 
seconds");
+
+        // Initialize
+        runner.run(1, false);
+        final String state_0 = 
runner.getStateManager().getState(Scope.CLUSTER).get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO);
+        assertNull(state_0);
+
+        // First ever FlowFile triggers sync
+        runner.enqueue("Incoming data 1");
+        runNext(runner);
+        final String state_1 = 
runner.getStateManager().getState(Scope.CLUSTER).get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO);
+        assertNotNull(state_1);
+
+        // Wait > (2/3 * T)
+        nowProvider.set(TimeUnit.SECONDS.toMillis(startup_time_seconds + ((2 * 
threshold_seconds) / 3) + 1));
+        runNext(runner);
+        runner.enqueue("Incoming data 2");
+        runNext(runner);
+        final String state_2 = 
runner.getStateManager().getState(Scope.CLUSTER).get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO);
+        assertNotEquals(state_1, state_2);
+    }
 }

Reply via email to