This is an automated email from the ASF dual-hosted git repository.
tpalfy pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 1b4780c80d NIFI-13829: Mitigate false positive reports of
MonitorActivity, in case of infrequent Flow Files
1b4780c80d is described below
commit 1b4780c80d0a49e9cca991f1823ae009abd93fa2
Author: Rajmund Takacs <[email protected]>
AuthorDate: Wed Oct 2 14:41:53 2024 +0200
NIFI-13829: Mitigate false positive reports of MonitorActivity, in case of
infrequent Flow Files
This closes #9333.
Signed-off-by: Tamas Palfy <[email protected]>
(cherry picked from commit 7da06aab6296b6d32d0133dbbdd85c489a5709f1)
---
.../nifi/processors/standard/MonitorActivity.java | 38 ++++++++++++++--------
.../processors/standard/TestMonitorActivity.java | 38 ++++++++++++++++++++++
2 files changed, 63 insertions(+), 13 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
index 43efc3f82b..975e8f2b4d 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
@@ -185,7 +185,7 @@ public class MonitorActivity extends AbstractProcessor {
private final AtomicBoolean connectedWhenLastTriggered = new
AtomicBoolean(false);
private final AtomicLong lastInactiveMessage = new AtomicLong();
- private final AtomicLong inactivityStartMillis = new
AtomicLong(System.currentTimeMillis());
+ private final AtomicLong inactivityStartMillis = new
AtomicLong(nowMillis());
private final AtomicBoolean wasActive = new AtomicBoolean(true);
private volatile LocalFlowActivityInfo localFlowActivityInfo;
@@ -303,7 +303,7 @@ public class MonitorActivity extends AbstractProcessor {
final boolean isActive = localFlowActivityInfo.isActive() ||
!flowFiles.isEmpty();
final long lastActivity = localFlowActivityInfo.getLastActivity();
final long inactivityStartMillis = this.inactivityStartMillis.get();
- final boolean timeToRepeatInactiveMessage = (lastInactiveMessage.get()
+ thresholdMillis) <= System.currentTimeMillis();
+ final boolean timeToRepeatInactiveMessage = (lastInactiveMessage.get()
+ thresholdMillis) <= nowMillis();
final boolean canReport = !isClusterScope || isConnectedToCluster ||
!flowFiles.isEmpty();
final boolean canChangeState = !waitForActivity ||
localFlowActivityInfo.hasSuccessfulTransfer();
@@ -323,10 +323,14 @@ public class MonitorActivity extends AbstractProcessor {
}
}
- protected long getStartupTime() {
+ protected long nowMillis() {
return System.currentTimeMillis();
}
+ protected long getStartupTime() {
+ return nowMillis();
+ }
+
protected final long getLastSuccessfulTransfer() {
return localFlowActivityInfo.getLastSuccessfulTransfer();
}
@@ -371,7 +375,7 @@ public class MonitorActivity extends AbstractProcessor {
if (shouldThisNodeReport) {
sendInactivityMarker(context, session, lastActivity, logger);
}
- lastInactiveMessage.set(System.currentTimeMillis());
+ lastInactiveMessage.set(nowMillis());
setInactivityFlag(context.getStateManager());
}
@@ -460,7 +464,7 @@ public class MonitorActivity extends AbstractProcessor {
inactiveFlowFile = session.putAttribute(
inactiveFlowFile,
"inactivityDurationMillis",
- String.valueOf(System.currentTimeMillis() -
inactivityStartMillis)
+ String.valueOf(nowMillis() - inactivityStartMillis)
);
final byte[] outBytes =
context.getProperty(INACTIVITY_MESSAGE).evaluateAttributeExpressions(inactiveFlowFile).getValue().getBytes(
@@ -481,7 +485,8 @@ public class MonitorActivity extends AbstractProcessor {
activityRestoredFlowFile =
session.putAttribute(activityRestoredFlowFile, "inactivityStartMillis",
String.valueOf(
inactivityStartMillis));
- activityRestoredFlowFile =
session.putAttribute(activityRestoredFlowFile, "inactivityDurationMillis",
String.valueOf(System.currentTimeMillis() - inactivityStartMillis));
+ activityRestoredFlowFile =
session.putAttribute(activityRestoredFlowFile, "inactivityDurationMillis",
String.valueOf(
+ nowMillis() - inactivityStartMillis));
final byte[] outBytes =
context.getProperty(ACTIVITY_RESTORED_MESSAGE).evaluateAttributeExpressions(activityRestoredFlowFile).getValue().getBytes(
StandardCharsets.UTF_8);
@@ -492,12 +497,14 @@ public class MonitorActivity extends AbstractProcessor {
logger.info("Transferred {} to 'activity.restored'",
activityRestoredFlowFile);
}
- private static class LocalFlowActivityInfo {
+ private class LocalFlowActivityInfo {
private static final long NO_VALUE = 0;
+ private static final int TIMES_SYNC_WITHIN_THRESHOLD = 3;
private final long startupTimeMillis;
private final long thresholdMillis;
private final boolean saveAttributes;
+ private final long syncPeriodMillis;
private long nextSyncMillis = NO_VALUE;
private long lastSuccessfulTransfer = NO_VALUE;
@@ -507,6 +514,7 @@ public class MonitorActivity extends AbstractProcessor {
this.startupTimeMillis = startupTimeMillis;
this.thresholdMillis = thresholdMillis;
this.saveAttributes = saveAttributes;
+ this.syncPeriodMillis = thresholdMillis /
TIMES_SYNC_WITHIN_THRESHOLD;
}
public LocalFlowActivityInfo(long startupTimeMillis, long
thresholdMillis, boolean saveAttributes, long initialLastSuccessfulTransfer) {
@@ -515,22 +523,22 @@ public class MonitorActivity extends AbstractProcessor {
}
public boolean syncNeeded() {
- return nextSyncMillis <= System.currentTimeMillis();
+ return nextSyncMillis <= nowMillis();
}
public void setNextSyncMillis() {
- nextSyncMillis = System.currentTimeMillis() + (thresholdMillis /
3);
+ nextSyncMillis = nowMillis() + syncPeriodMillis;
}
public void forceSync() {
- nextSyncMillis = System.currentTimeMillis();
+ nextSyncMillis = nowMillis();
}
public boolean isActive() {
if (hasSuccessfulTransfer()) {
- return System.currentTimeMillis() < (lastSuccessfulTransfer +
thresholdMillis);
+ return nowMillis() < (lastSuccessfulTransfer +
thresholdMillis);
} else {
- return System.currentTimeMillis() < (startupTimeMillis +
thresholdMillis);
+ return nowMillis() < (startupTimeMillis + thresholdMillis);
}
}
@@ -555,7 +563,11 @@ public class MonitorActivity extends AbstractProcessor {
}
public void update(FlowFile flowFile) {
- this.lastSuccessfulTransfer = System.currentTimeMillis();
+ final long now = nowMillis();
+ if ((now - this.getLastActivity()) > syncPeriodMillis) {
+ this.forceSync(); // Immediate synchronization if Flow Files
are infrequent, to mitigate false reports
+ }
+ this.lastSuccessfulTransfer = now;
if (saveAttributes) {
lastSuccessfulTransferAttributes = new
HashMap<>(flowFile.getAttributes());
lastSuccessfulTransferAttributes.remove(CoreAttributes.UUID.key());
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
index da5c288005..387f71b5bf 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
@@ -30,6 +30,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
@@ -1375,4 +1376,41 @@ public class TestMonitorActivity {
runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 3);
runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
}
+
+ @Test
+ public void testInfrequentFlowFilesTriggerImmediateSynchronization()
throws IOException, InterruptedException {
+ final long threshold_seconds = 30;
+ final long startup_time_seconds = 1;
+ final AtomicLong nowProvider = new
AtomicLong(TimeUnit.SECONDS.toMillis(startup_time_seconds));
+ final TestRunner runner = TestRunners.newTestRunner(new
MonitorActivity() {
+ @Override
+ protected long nowMillis() {
+ return nowProvider.get();
+ }
+ });
+ runner.setIsConfiguredForClustering(true);
+ runner.setConnected(true);
+ runner.setPrimaryNode(false);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE,
MonitorActivity.SCOPE_CLUSTER);
+ runner.setProperty(MonitorActivity.THRESHOLD, threshold_seconds + "
seconds");
+
+ // Initialize
+ runner.run(1, false);
+ final String state_0 =
runner.getStateManager().getState(Scope.CLUSTER).get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO);
+ assertNull(state_0);
+
+ // First ever FlowFile triggers sync
+ runner.enqueue("Incoming data 1");
+ runNext(runner);
+ final String state_1 =
runner.getStateManager().getState(Scope.CLUSTER).get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO);
+ assertNotNull(state_1);
+
+ // Wait > (2/3 * T)
+ nowProvider.set(TimeUnit.SECONDS.toMillis(startup_time_seconds + ((2 *
threshold_seconds) / 3) + 1));
+ runNext(runner);
+ runner.enqueue("Incoming data 2");
+ runNext(runner);
+ final String state_2 =
runner.getStateManager().getState(Scope.CLUSTER).get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO);
+ assertNotEquals(state_1, state_2);
+ }
}