This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 2ea497f81c NIFI-13072: Fix MonitorActivity problems with cluster scope 
flow monitoring
2ea497f81c is described below

commit 2ea497f81c83606e2daa931b5ad7cb0a267ae850
Author: Rajmund Takacs <[email protected]>
AuthorDate: Thu Apr 4 17:09:59 2024 +0200

    NIFI-13072: Fix MonitorActivity problems with cluster scope flow monitoring
    
    This closes #8669.
    
    Signed-off-by: Peter Turcsanyi <[email protected]>
    
    (cherry picked from commit bffacdec982a9f3f0bf6004bcecbd21d1049e401)
---
 .../nifi/processors/standard/MonitorActivity.java  | 624 +++++++++++++--------
 .../processors/standard/TestMonitorActivity.java   | 615 +++++++++++++++++---
 2 files changed, 942 insertions(+), 297 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
index 7021f97fc9..43efc3f82b 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
@@ -16,6 +16,20 @@
  */
 package org.apache.nifi.processors.standard;
 
+import static java.util.Collections.singletonMap;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
@@ -30,7 +44,6 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.components.state.StateMap;
@@ -44,23 +57,7 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.util.StringUtils;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 @SideEffectFree
 @TriggerSerially
@@ -72,10 +69,18 @@ import java.util.concurrent.atomic.AtomicLong;
 @WritesAttributes({
     @WritesAttribute(attribute = "inactivityStartMillis", description = "The 
time at which Inactivity began, in the form of milliseconds since Epoch"),
     @WritesAttribute(attribute = "inactivityDurationMillis", description = 
"The number of milliseconds that the inactivity has spanned")})
-@Stateful(scopes = Scope.CLUSTER, description = "MonitorActivity stores the 
last timestamp at each node as state, so that it can examine activity at 
cluster wide." +
-        "If 'Copy Attribute' is set to true, then flow file attributes are 
also persisted.")
+@Stateful(
+        scopes = { Scope.CLUSTER, Scope.LOCAL },
+        description = "MonitorActivity stores the last timestamp at each node 
as state, "
+                + "so that it can examine activity at cluster wide. "
+                + "If 'Copy Attribute' is set to true, then flow file 
attributes are also persisted. "
+                + "In local scope, it stores last known activity timestamp if 
the flow is inactive."
+)
 public class MonitorActivity extends AbstractProcessor {
 
+    public static final String STATE_KEY_COMMON_FLOW_ACTIVITY_INFO = 
"CommonFlowActivityInfo.lastSuccessfulTransfer";
+    public static final String STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO = 
"LocalFlowActivityInfo.lastSuccessfulTransfer";
+
     public static final AllowableValue SCOPE_NODE = new AllowableValue("node");
     public static final AllowableValue SCOPE_CLUSTER = new 
AllowableValue("cluster");
     public static final AllowableValue REPORT_NODE_ALL = new 
AllowableValue("all");
@@ -113,6 +118,14 @@ public class MonitorActivity extends AbstractProcessor {
             .allowableValues("true", "false")
             .defaultValue("false")
             .build();
+    public static final PropertyDescriptor RESET_STATE_ON_RESTART = new 
PropertyDescriptor.Builder()
+            .name("Reset State on Restart")
+            .description("When the processor gets started or restarted, if set 
to true, the initial state will always be active. "
+                    + "Otherwise, the last reported flow state will be 
preserved.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .build();
     public static final PropertyDescriptor INACTIVITY_MESSAGE = new 
PropertyDescriptor.Builder()
             .name("Inactivity Message")
             .description("The message that will be the content of FlowFiles 
that are sent to the 'inactive' relationship")
@@ -124,7 +137,7 @@ public class MonitorActivity extends AbstractProcessor {
     public static final PropertyDescriptor COPY_ATTRIBUTES = new 
PropertyDescriptor.Builder()
             .name("Copy Attributes")
             .description("If true, will copy all flow file attributes from the 
flow file that resumed activity to the newly created indicator flow file")
-            .required(false)
+            .required(true)
             .allowableValues("true", "false")
             .defaultValue("false")
             .build();
@@ -148,11 +161,7 @@ public class MonitorActivity extends AbstractProcessor {
                     " even if it's 'primary', NiFi act as 'all'.")
             .required(true)
             .allowableValues(REPORT_NODE_ALL, REPORT_NODE_PRIMARY)
-            .addValidator(((subject, input, context) -> {
-                boolean invalid = REPORT_NODE_PRIMARY.equals(input) && 
SCOPE_NODE.equals(context.getProperty(MONITORING_SCOPE).getValue());
-                return new 
ValidationResult.Builder().subject(subject).input(input)
-                        .explanation("'" + REPORT_NODE_PRIMARY + "' is only 
available with '" + SCOPE_CLUSTER + "' scope.").valid(!invalid).build();
-            }))
+            .dependsOn(MONITORING_SCOPE, SCOPE_CLUSTER)
             .defaultValue(REPORT_NODE_ALL.getValue())
             .build();
 
@@ -170,18 +179,16 @@ public class MonitorActivity extends AbstractProcessor {
             .description("This relationship is used to transfer an Activity 
Restored indicator when FlowFiles are routing to 'success' following a "
                     + "period of inactivity")
             .build();
-    public static final Charset UTF8 = Charset.forName("UTF-8");
 
     private List<PropertyDescriptor> properties;
     private Set<Relationship> relationships;
 
-    private final AtomicLong latestSuccessTransfer = new 
AtomicLong(System.currentTimeMillis());
-    private final AtomicLong latestReportedNodeState = new 
AtomicLong(System.currentTimeMillis());
-    private final AtomicBoolean inactive = new AtomicBoolean(false);
-    private final AtomicBoolean hasSuccessTransfer = new AtomicBoolean(false);
     private final AtomicBoolean connectedWhenLastTriggered = new 
AtomicBoolean(false);
-    private final AtomicLong lastInactiveMessage = new 
AtomicLong(System.currentTimeMillis());
-    public static final String STATE_KEY_LATEST_SUCCESS_TRANSFER = 
"MonitorActivity.latestSuccessTransfer";
+    private final AtomicLong lastInactiveMessage = new AtomicLong();
+    private final AtomicLong inactivityStartMillis = new 
AtomicLong(System.currentTimeMillis());
+    private final AtomicBoolean wasActive = new AtomicBoolean(true);
+
+    private volatile LocalFlowActivityInfo localFlowActivityInfo;
 
     @Override
     protected void init(final ProcessorInitializationContext context) {
@@ -191,6 +198,7 @@ public class MonitorActivity extends AbstractProcessor {
         properties.add(INACTIVITY_MESSAGE);
         properties.add(ACTIVITY_RESTORED_MESSAGE);
         properties.add(WAIT_FOR_ACTIVITY);
+        properties.add(RESET_STATE_ON_RESTART);
         properties.add(COPY_ATTRIBUTES);
         properties.add(MONITORING_SCOPE);
         properties.add(REPORTING_NODE);
@@ -217,224 +225,214 @@ public class MonitorActivity extends AbstractProcessor {
     public void onScheduled(final ProcessContext context) {
         // Check configuration.
         isClusterScope(context, true);
-        resetLastSuccessfulTransfer();
-        inactive.set(false);
-        hasSuccessTransfer.set(false);
-    }
-
 
-    protected void resetLastSuccessfulTransfer() {
-        setLastSuccessfulTransfer(System.currentTimeMillis());
-    }
-
-    protected final void setLastSuccessfulTransfer(final long timestamp) {
-        latestSuccessTransfer.set(timestamp);
-        latestReportedNodeState.set(timestamp);
-    }
-
-    protected final long getLatestSuccessTransfer() {
-        return latestSuccessTransfer.get();
-    }
-
-    private boolean isClusterScope(final ProcessContext context, boolean 
logInvalidConfig) {
-        if 
(SCOPE_CLUSTER.equals(context.getProperty(MONITORING_SCOPE).getValue())) {
-            if (getNodeTypeProvider().isConfiguredForClustering()) {
-                return true;
-            }
-            if (logInvalidConfig) {
-                getLogger().warn("NiFi is running as a Standalone mode, but 
'cluster' scope is set." +
-                        " Fallback to 'node' scope. Fix configuration to stop 
this message.");
-            }
+        final long thresholdMillis = 
context.getProperty(THRESHOLD).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+        final boolean copyAttributes = 
context.getProperty(COPY_ATTRIBUTES).asBoolean();
+        final boolean resetStateOnRestart = 
context.getProperty(RESET_STATE_ON_RESTART).asBoolean();
+
+        // Attempt to load last state by the time of stopping this processor. 
A local state only exists if
+        // the monitored flow was already inactive, when the processor was 
shutting down.
+        final String storedLastSuccessfulTransfer = resetStateOnRestart ? null 
: tryLoadLastSuccessfulTransfer(context);
+
+        if (storedLastSuccessfulTransfer != null) {
+            // Initialize local flow as being inactive since the stored 
timestamp.
+            localFlowActivityInfo = new LocalFlowActivityInfo(
+                    getStartupTime(), thresholdMillis, copyAttributes, 
Long.parseLong(storedLastSuccessfulTransfer));
+            wasActive.set(localFlowActivityInfo.isActive());
+            inactivityStartMillis.set(localFlowActivityInfo.getLastActivity());
+        } else {
+            // Initialize local flow as being active. If there is no traffic, 
then it will eventually become inactive.
+            localFlowActivityInfo = new LocalFlowActivityInfo(
+                    getStartupTime(), thresholdMillis, copyAttributes);
+            wasActive.set(true);
         }
-        return false;
     }
 
-    private boolean shouldReportOnlyOnPrimary(boolean isClusterScope, final 
ProcessContext context) {
-        if 
(REPORT_NODE_PRIMARY.equals(context.getProperty(REPORTING_NODE).getValue())) {
-            if (isClusterScope) {
-                return true;
+    @OnStopped
+    public void onStopped(final ProcessContext context) {
+        if (getNodeTypeProvider().isConfiguredForClustering() && 
context.isConnectedToCluster()) {
+            // Shared state needs to be cleared, in order to avoid getting 
inactive markers right after starting the
+            // flow after a weekend stop. In single-node setup, there is no 
shared state to be cleared, but the line
+            // below would also wipe out the local state. Hence, the check.
+            final StateManager stateManager = context.getStateManager();
+            try {
+                stateManager.clear(Scope.CLUSTER);
+            } catch (IOException e) {
+                getLogger().error("Failed to clear cluster state" + e, e);
             }
         }
-        return false;
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
-        final long thresholdMillis = 
context.getProperty(THRESHOLD).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
-        final long now = System.currentTimeMillis();
-
         final ComponentLog logger = getLogger();
-        final boolean copyAttributes = 
context.getProperty(COPY_ATTRIBUTES).asBoolean();
-        final boolean waitForActivity = 
context.getProperty(WAIT_FOR_ACTIVITY).asBoolean();
+
         final boolean isClusterScope = isClusterScope(context, false);
         final boolean isConnectedToCluster = context.isConnectedToCluster();
-        final boolean shouldReportOnlyOnPrimary = 
shouldReportOnlyOnPrimary(isClusterScope, context);
+        final boolean wasActive = this.wasActive.get();
+
         final List<FlowFile> flowFiles = session.get(50);
 
-        if (isClusterScope(context, true)) {
-            if (isReconnectedToCluster(isConnectedToCluster)) {
-                reconcileState(context);
-                connectedWhenLastTriggered.set(true);
-            } else if (!isConnectedToCluster) {
-                connectedWhenLastTriggered.set(false);
+        if (!flowFiles.isEmpty()) {
+            final boolean firstKnownTransfer = 
!localFlowActivityInfo.hasSuccessfulTransfer();
+            final boolean flowStateMustBecomeActive = !wasActive || 
firstKnownTransfer;
+
+            localFlowActivityInfo.update(flowFiles.get(0));
+
+            if (isClusterScope && flowStateMustBecomeActive) {
+                localFlowActivityInfo.forceSync();
             }
+
+            session.transfer(flowFiles, REL_SUCCESS);
+            logger.info("Transferred {} FlowFiles to 'success'", 
flowFiles.size());
+        } else {
+            context.yield();
         }
 
-        boolean isInactive = false;
-        long updatedLatestSuccessTransfer = -1;
-        StateMap clusterState = null;
-
-        if (flowFiles.isEmpty()) {
-            final long previousSuccessMillis = latestSuccessTransfer.get();
-
-            boolean sendInactiveMarker = false;
-
-            isInactive = (now >= previousSuccessMillis + thresholdMillis);
-            logger.debug("isInactive={}, previousSuccessMillis={}, now={}", 
new Object[]{isInactive, previousSuccessMillis, now});
-            if (isInactive && isClusterScope && isConnectedToCluster) {
-                // Even if this node has been inactive, there may be other 
nodes handling flow actively.
-                // However, if this node is active, we don't have to look at 
cluster state.
-                try {
-                    clusterState = session.getState(Scope.CLUSTER);
-                    if (clusterState != null && 
!StringUtils.isEmpty(clusterState.get(STATE_KEY_LATEST_SUCCESS_TRANSFER))) {
-                        final long latestReportedClusterActivity = 
Long.valueOf(clusterState.get(STATE_KEY_LATEST_SUCCESS_TRANSFER));
-                        isInactive = (now >= latestReportedClusterActivity + 
thresholdMillis);
-                        if (!isInactive) {
-                            // This node has been inactive, but other node has 
more recent activity.
-                            updatedLatestSuccessTransfer = 
latestReportedClusterActivity;
-                        }
-                        logger.debug("isInactive={}, 
latestReportedClusterActivity={}", new Object[]{isInactive, 
latestReportedClusterActivity});
-                    }
-                } catch (IOException e) {
-                    logger.error("Failed to access cluster state. Activity 
will not be monitored properly until this is addressed.", e);
-                }
+        if (isClusterScope) {
+            if (!wasActive || !localFlowActivityInfo.isActive()) {
+                localFlowActivityInfo.forceSync();
             }
+            synchronizeState(context);
+        }
 
-            if (isInactive) {
-                final boolean continual = 
context.getProperty(CONTINUALLY_SEND_MESSAGES).asBoolean();
-                sendInactiveMarker = !inactive.getAndSet(true) || (continual 
&& (now > lastInactiveMessage.get() + thresholdMillis));
-                if (waitForActivity) {
-                    sendInactiveMarker = sendInactiveMarker && 
hasSuccessTransfer.get();
-                }
+        final long thresholdMillis = 
context.getProperty(THRESHOLD).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+        final boolean continuallySendMessages = 
context.getProperty(CONTINUALLY_SEND_MESSAGES).asBoolean();
+        final boolean waitForActivity = 
context.getProperty(WAIT_FOR_ACTIVITY).asBoolean();
+
+        final boolean isActive = localFlowActivityInfo.isActive() || 
!flowFiles.isEmpty();
+        final long lastActivity = localFlowActivityInfo.getLastActivity();
+        final long inactivityStartMillis = this.inactivityStartMillis.get();
+        final boolean timeToRepeatInactiveMessage = (lastInactiveMessage.get() 
+ thresholdMillis) <= System.currentTimeMillis();
+
+        final boolean canReport = !isClusterScope || isConnectedToCluster || 
!flowFiles.isEmpty();
+        final boolean canChangeState = !waitForActivity || 
localFlowActivityInfo.hasSuccessfulTransfer();
+
+        if (canReport && canChangeState) {
+            if (isActive) {
+                onTriggerActiveFlow(context, session, wasActive, 
isClusterScope, inactivityStartMillis);
+            } else if (wasActive || continuallySendMessages && 
timeToRepeatInactiveMessage) {
+                onTriggerInactiveFlow(context, session, isClusterScope, 
lastActivity);
             }
+            this.wasActive.set(isActive);
+            this.inactivityStartMillis.set(lastActivity);
+        } else {
+            // We need to block state transition, because we are not connected 
to the cluster.
+            // When we reconnect, and the state persists, then the next 
onTrigger will do the transition.
+            logger.trace("State transition is blocked, because we are not 
connected to the cluster.");
+        }
+    }
 
-            if (sendInactiveMarker && shouldThisNodeReport(isClusterScope, 
shouldReportOnlyOnPrimary, context)) {
-                lastInactiveMessage.set(System.currentTimeMillis());
+    protected long getStartupTime() {
+        return System.currentTimeMillis();
+    }
 
-                FlowFile inactiveFlowFile = session.create();
-                inactiveFlowFile = session.putAttribute(inactiveFlowFile, 
"inactivityStartMillis", String.valueOf(previousSuccessMillis));
-                inactiveFlowFile = session.putAttribute(inactiveFlowFile, 
"inactivityDurationMillis", String.valueOf(now - previousSuccessMillis));
+    protected final long getLastSuccessfulTransfer() {
+        return localFlowActivityInfo.getLastSuccessfulTransfer();
+    }
 
-                final byte[] outBytes = 
context.getProperty(INACTIVITY_MESSAGE).evaluateAttributeExpressions(inactiveFlowFile).getValue().getBytes(UTF8);
-                inactiveFlowFile = session.write(inactiveFlowFile, new 
OutputStreamCallback() {
-                    @Override
-                    public void process(final OutputStream out) throws 
IOException {
-                        out.write(outBytes);
-                    }
-                });
+    private String tryLoadLastSuccessfulTransfer(ProcessContext context) {
+        final StateManager stateManager = context.getStateManager();
+        try {
+            final StateMap localStateMap = stateManager.getState(Scope.LOCAL);
+            return localStateMap.get(STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO);
+        } catch (IOException e) {
+            throw new ProcessException("Failed to load local state due to " + 
e, e);
+        }
+    }
 
-                session.getProvenanceReporter().create(inactiveFlowFile);
-                session.transfer(inactiveFlowFile, REL_INACTIVE);
-                logger.info("Transferred {} to 'inactive'", new 
Object[]{inactiveFlowFile});
-            } else {
-                context.yield();    // no need to dominate CPU checking times; 
let other processors run for a bit.
+    private void synchronizeState(ProcessContext context) {
+        final ComponentLog logger = getLogger();
+        final boolean isConnectedToCluster = context.isConnectedToCluster();
+
+        if (isReconnectedToCluster(isConnectedToCluster)) {
+            localFlowActivityInfo.forceSync();
+            connectedWhenLastTriggered.set(true);
+        }
+        if (!isConnectedToCluster) {
+            connectedWhenLastTriggered.set(false);
+        } else if (localFlowActivityInfo.syncNeeded()) {
+            final CommonFlowActivityInfo commonFlowActivityInfo = new 
CommonFlowActivityInfo(context);
+            localFlowActivityInfo.update(commonFlowActivityInfo);
+
+            try {
+                commonFlowActivityInfo.update(localFlowActivityInfo);
+                localFlowActivityInfo.setNextSyncMillis();
+            } catch (final SaveSharedFlowStateException ex) {
+                logger.debug("Failed to update common state.", ex);
             }
+        }
+    }
 
-        } else {
-            session.transfer(flowFiles, REL_SUCCESS);
-            hasSuccessTransfer.set(true);
-            updatedLatestSuccessTransfer = now;
-            logger.info("Transferred {} FlowFiles to 'success'", new 
Object[]{flowFiles.size()});
-
-            final long latestStateReportTimestamp = 
latestReportedNodeState.get();
-            if (isClusterScope
-                    && isConnectedToCluster
-                    && (now - latestStateReportTimestamp) > (thresholdMillis / 
3)) {
-                // We don't want to hit the state manager every onTrigger(), 
but often enough to detect activeness.
-                try {
-                    final StateMap state = session.getState(Scope.CLUSTER);
-
-                    final Map<String, String> newValues = new HashMap<>();
-
-                    // Persist attributes so that other nodes can copy it
-                    if (copyAttributes) {
-                        newValues.putAll(flowFiles.get(0).getAttributes());
-                    }
-                    newValues.put(STATE_KEY_LATEST_SUCCESS_TRANSFER, 
String.valueOf(now));
-
-                    if (state == null || state.getVersion() == -1) {
-                        session.setState(newValues, Scope.CLUSTER);
-                    } else {
-                        final String existingTimestamp = 
state.get(STATE_KEY_LATEST_SUCCESS_TRANSFER);
-                        if (StringUtils.isEmpty(existingTimestamp)
-                                || Long.parseLong(existingTimestamp) < now) {
-                            // If this returns false due to race condition, 
it's not a problem since we just need
-                            // the latest active timestamp.
-                            session.replaceState(state, newValues, 
Scope.CLUSTER);
-                        } else {
-                            logger.debug("Existing state has more recent 
timestamp, didn't update state.");
-                        }
-                    }
-                    latestReportedNodeState.set(now);
-                } catch (IOException e) {
-                    logger.error("Failed to access cluster state. Activity 
will not be monitored properly until this is addressed.", e);
-                }
+    private void onTriggerInactiveFlow(ProcessContext context, ProcessSession 
session, boolean isClusterScope, long lastActivity) {
+        final ComponentLog logger = getLogger();
+        final boolean shouldThisNodeReport = 
shouldThisNodeReport(isClusterScope, context);
+
+        if (shouldThisNodeReport) {
+            sendInactivityMarker(context, session, lastActivity, logger);
+        }
+        lastInactiveMessage.set(System.currentTimeMillis());
+        setInactivityFlag(context.getStateManager());
+    }
+
+    private void onTriggerActiveFlow(ProcessContext context, ProcessSession 
session, boolean wasActive, boolean isClusterScope,
+            long inactivityStartMillis) {
+        final ComponentLog logger = getLogger();
+        final boolean shouldThisNodeReport = 
shouldThisNodeReport(isClusterScope, context);
+
+        if (!wasActive) {
+            if (shouldThisNodeReport) {
+                final Map<String, String> attributes = 
localFlowActivityInfo.getLastSuccessfulTransferAttributes();
+                sendActivationMarker(context, session, attributes, 
inactivityStartMillis, logger);
             }
+            clearInactivityFlag(context.getStateManager());
         }
+    }
 
-        if (!isInactive) {
-            final long inactivityStartMillis = latestSuccessTransfer.get();
-            if (updatedLatestSuccessTransfer > -1) {
-                latestSuccessTransfer.set(updatedLatestSuccessTransfer);
+    private void setInactivityFlag(StateManager stateManager) {
+        try {
+            stateManager.setState(singletonMap(
+                    STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO,
+                    String.valueOf(localFlowActivityInfo.getLastActivity())
+            ), Scope.LOCAL);
+        } catch (IOException e) {
+            getLogger().error("Failed to set local state due to " + e, e);
+        }
+    }
+
+    private void clearInactivityFlag(StateManager stateManager) {
+        try {
+            stateManager.clear(Scope.LOCAL);
+        } catch (IOException e) {
+            throw new ProcessException("Failed to clear local state due to " + 
e, e);
+        }
+    }
+
+    private boolean isClusterScope(final ProcessContext context, boolean 
logInvalidConfig) {
+        if 
(SCOPE_CLUSTER.getValue().equals(context.getProperty(MONITORING_SCOPE).getValue()))
 {
+            if (getNodeTypeProvider().isConfiguredForClustering()) {
+                return true;
             }
-            if (inactive.getAndSet(false) && 
shouldThisNodeReport(isClusterScope, shouldReportOnlyOnPrimary, context)) {
-                FlowFile activityRestoredFlowFile = session.create();
-
-                if (copyAttributes) {
-
-                    final Map<String, String> attributes = new HashMap<>();
-                    if (flowFiles.size() > 0) {
-                        // copy attributes from the first flow file in the list
-                        attributes.putAll(flowFiles.get(0).getAttributes());
-                    } else if (clusterState != null) {
-                        attributes.putAll(clusterState.toMap());
-                        attributes.remove(STATE_KEY_LATEST_SUCCESS_TRANSFER);
-                    }
-                    // don't copy the UUID
-                    attributes.remove(CoreAttributes.UUID.key());
-                    activityRestoredFlowFile = 
session.putAllAttributes(activityRestoredFlowFile, attributes);
-                }
-
-                activityRestoredFlowFile = 
session.putAttribute(activityRestoredFlowFile, "inactivityStartMillis", 
String.valueOf(inactivityStartMillis));
-                activityRestoredFlowFile = 
session.putAttribute(activityRestoredFlowFile, "inactivityDurationMillis", 
String.valueOf(now - inactivityStartMillis));
-
-                final byte[] outBytes = 
context.getProperty(ACTIVITY_RESTORED_MESSAGE).evaluateAttributeExpressions(activityRestoredFlowFile).getValue().getBytes(UTF8);
-                activityRestoredFlowFile = 
session.write(activityRestoredFlowFile, out -> out.write(outBytes));
-
-                
session.getProvenanceReporter().create(activityRestoredFlowFile);
-                session.transfer(activityRestoredFlowFile, 
REL_ACTIVITY_RESTORED);
-                logger.info("Transferred {} to 'activity.restored'", new 
Object[]{activityRestoredFlowFile});
+            if (logInvalidConfig) {
+                getLogger().warn("NiFi is running as a Standalone mode, but 
'cluster' scope is set." +
+                        " Fallback to 'node' scope. Fix configuration to stop 
this message.");
             }
         }
+        return false;
     }
 
-    @OnStopped
-    public void onStopped(final ProcessContext context) {
-        if (getNodeTypeProvider().isPrimary()) {
-            final StateManager stateManager = context.getStateManager();
-            try {
-                stateManager.clear(Scope.CLUSTER);
-            } catch (IOException e) {
-                getLogger().error("Failed to clear cluster state due to " + e, 
e);
-            }
+    private boolean shouldReportOnlyOnPrimary(boolean isClusterScope, final 
ProcessContext context) {
+        if 
(REPORT_NODE_PRIMARY.getValue().equals(context.getProperty(REPORTING_NODE).getValue()))
 {
+            return isClusterScope;
         }
+        return false;
     }
 
     /**
      * Will return true when the last known state is "not connected" and the 
current state is "connected". This might
-     * happen when during last @OnTrigger the node was not connected but 
currently it is (reconnection); or when the
+     * happen when during last @OnTrigger the node was not connected, but 
currently it is (reconnection); or when the
      * processor is triggered first time (initial connection).
-     *
+     * <br />
      * This second case is due to safety reasons: it is possible that during 
the first trigger the node is not connected
      * to the cluster thus the default value of the #connected attribute is 
false and stays as false until it's proven
      * otherwise.
@@ -447,25 +445,203 @@ public class MonitorActivity extends AbstractProcessor {
         return !connectedWhenLastTriggered.get() && isConnectedToCluster;
     }
 
-    private void reconcileState(final ProcessContext context)  {
-        try {
-            final StateMap state = 
context.getStateManager().getState(Scope.CLUSTER);
-            final Map<String, String> newState = new HashMap<>();
-            newState.putAll(state.toMap());
+    private boolean shouldThisNodeReport(final boolean isClusterScope, final 
ProcessContext context) {
+        final boolean shouldReportOnlyOnPrimary = 
shouldReportOnlyOnPrimary(isClusterScope, context);
+        return !isClusterScope || (!shouldReportOnlyOnPrimary || 
getNodeTypeProvider().isPrimary());
+    }
 
-            final long validLastSuccessTransfer = 
StringUtils.isEmpty(state.get(STATE_KEY_LATEST_SUCCESS_TRANSFER))
-                    ? latestSuccessTransfer.get()
-                    : 
Math.max(Long.valueOf(state.get(STATE_KEY_LATEST_SUCCESS_TRANSFER)), 
latestSuccessTransfer.get());
+    private void sendInactivityMarker(ProcessContext context, ProcessSession 
session, long inactivityStartMillis,
+            ComponentLog logger) {
+        FlowFile inactiveFlowFile = session.create();
+        inactiveFlowFile = session.putAttribute(
+                inactiveFlowFile,
+                "inactivityStartMillis", String.valueOf(inactivityStartMillis)
+        );
+        inactiveFlowFile = session.putAttribute(
+                inactiveFlowFile,
+                "inactivityDurationMillis",
+                String.valueOf(System.currentTimeMillis() - 
inactivityStartMillis)
+        );
+
+        final byte[] outBytes = 
context.getProperty(INACTIVITY_MESSAGE).evaluateAttributeExpressions(inactiveFlowFile).getValue().getBytes(
+                StandardCharsets.UTF_8);
+        inactiveFlowFile = session.write(inactiveFlowFile, out -> 
out.write(outBytes));
+
+        session.getProvenanceReporter().create(inactiveFlowFile);
+        session.transfer(inactiveFlowFile, REL_INACTIVE);
+        logger.info("Transferred {} to 'inactive'", inactiveFlowFile);
+    }
 
-            newState.put(STATE_KEY_LATEST_SUCCESS_TRANSFER, 
String.valueOf(validLastSuccessTransfer));
-            context.getStateManager().replace(state, newState, Scope.CLUSTER);
-        } catch (IOException e) {
-            getLogger().error("Could not reconcile state after (re)connection! 
Reason: " + e.getMessage());
-            throw new ProcessException(e);
+    private void sendActivationMarker(ProcessContext context, ProcessSession 
session, Map<String, String> attributes,
+            long inactivityStartMillis, ComponentLog logger) {
+        FlowFile activityRestoredFlowFile = session.create();
+        // don't copy the UUID
+        attributes.remove(CoreAttributes.UUID.key());
+        activityRestoredFlowFile = 
session.putAllAttributes(activityRestoredFlowFile, attributes);
+
+        activityRestoredFlowFile = 
session.putAttribute(activityRestoredFlowFile, "inactivityStartMillis", 
String.valueOf(
+                inactivityStartMillis));
+        activityRestoredFlowFile = 
session.putAttribute(activityRestoredFlowFile, "inactivityDurationMillis", 
String.valueOf(System.currentTimeMillis() - inactivityStartMillis));
+
+        final byte[] outBytes = 
context.getProperty(ACTIVITY_RESTORED_MESSAGE).evaluateAttributeExpressions(activityRestoredFlowFile).getValue().getBytes(
+                StandardCharsets.UTF_8);
+        activityRestoredFlowFile = session.write(activityRestoredFlowFile, out 
-> out.write(outBytes));
+
+        session.getProvenanceReporter().create(activityRestoredFlowFile);
+        session.transfer(activityRestoredFlowFile, REL_ACTIVITY_RESTORED);
+        logger.info("Transferred {} to 'activity.restored'", 
activityRestoredFlowFile);
+    }
+
+    private static class LocalFlowActivityInfo {
+        private static final long NO_VALUE = 0;
+
+        private final long startupTimeMillis;
+        private final long thresholdMillis;
+        private final boolean saveAttributes;
+
+        private long nextSyncMillis = NO_VALUE;
+        private long lastSuccessfulTransfer = NO_VALUE;
+        private Map<String, String> lastSuccessfulTransferAttributes = new 
HashMap<>();
+
+        public LocalFlowActivityInfo(long startupTimeMillis, long 
thresholdMillis, boolean saveAttributes) {
+            this.startupTimeMillis = startupTimeMillis;
+            this.thresholdMillis = thresholdMillis;
+            this.saveAttributes = saveAttributes;
+        }
+
+        public LocalFlowActivityInfo(long startupTimeMillis, long 
thresholdMillis, boolean saveAttributes, long initialLastSuccessfulTransfer) {
+            this(startupTimeMillis, thresholdMillis, saveAttributes);
+            lastSuccessfulTransfer = initialLastSuccessfulTransfer;
+        }
+
+        public boolean syncNeeded() {
+            return nextSyncMillis <= System.currentTimeMillis();
+        }
+
+        public void setNextSyncMillis() {
+            nextSyncMillis = System.currentTimeMillis() + (thresholdMillis / 
3);
+        }
+
+        public void forceSync() {
+            nextSyncMillis = System.currentTimeMillis();
+        }
+
+        public boolean isActive() {
+            if (hasSuccessfulTransfer()) {
+                return System.currentTimeMillis() < (lastSuccessfulTransfer + 
thresholdMillis);
+            } else {
+                return System.currentTimeMillis() < (startupTimeMillis + 
thresholdMillis);
+            }
+        }
+
+        public boolean hasSuccessfulTransfer() {
+            return lastSuccessfulTransfer != NO_VALUE;
+        }
+
+        public long getLastSuccessfulTransfer() {
+            return lastSuccessfulTransfer;
+        }
+
+        public long getLastActivity() {
+            if (hasSuccessfulTransfer()) {
+                return lastSuccessfulTransfer;
+            } else {
+                return startupTimeMillis;
+            }
+        }
+
+        public Map<String, String> getLastSuccessfulTransferAttributes() {
+            return lastSuccessfulTransferAttributes;
+        }
+
+        public void update(FlowFile flowFile) {
+            this.lastSuccessfulTransfer = System.currentTimeMillis();
+            if (saveAttributes) {
+                lastSuccessfulTransferAttributes = new 
HashMap<>(flowFile.getAttributes());
+                
lastSuccessfulTransferAttributes.remove(CoreAttributes.UUID.key());
+            }
+        }
+
+        public void update(CommonFlowActivityInfo commonFlowActivityInfo) {
+            if (!commonFlowActivityInfo.hasSuccessfulTransfer()) {
+                return;
+            }
+
+            final long lastSuccessfulTransfer = 
commonFlowActivityInfo.getLastSuccessfulTransfer();
+
+            if (lastSuccessfulTransfer <= getLastSuccessfulTransfer()) {
+                return;
+            }
+
+            this.lastSuccessfulTransfer = lastSuccessfulTransfer;
+            if (saveAttributes) {
+                lastSuccessfulTransferAttributes = 
commonFlowActivityInfo.getLastSuccessfulTransferAttributes();
+            }
         }
     }
 
-    private boolean shouldThisNodeReport(final boolean isClusterScope, final 
boolean isReportOnlyOnPrimary, final ProcessContext context) {
-        return !isClusterScope || ((!isReportOnlyOnPrimary || 
getNodeTypeProvider().isPrimary()) && context.isConnectedToCluster());
+    private static class CommonFlowActivityInfo {
+        private final StateManager stateManager;
+        private final StateMap storedState;
+        private final Map<String, String> newState = new HashMap<>();
+
+        public CommonFlowActivityInfo(ProcessContext context) {
+            this.stateManager = context.getStateManager();
+            try {
+                storedState = stateManager.getState(Scope.CLUSTER);
+            } catch (IOException e) {
+                throw new ProcessException("Cannot load common flow activity 
info.", e);
+            }
+        }
+
+        public boolean hasSuccessfulTransfer() {
+            return storedState.get(STATE_KEY_COMMON_FLOW_ACTIVITY_INFO) != 
null;
+        }
+
+        public long getLastSuccessfulTransfer() {
+            return 
Long.parseLong(storedState.get(STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
+        }
+
+        public Map<String, String> getLastSuccessfulTransferAttributes() {
+            final Map<String, String> result = new 
HashMap<>(storedState.toMap());
+            result.remove(STATE_KEY_COMMON_FLOW_ACTIVITY_INFO);
+            return result;
+        }
+
+        public void update(LocalFlowActivityInfo localFlowActivityInfo) {
+            if (!localFlowActivityInfo.hasSuccessfulTransfer()) {
+                return;
+            }
+
+            final long lastSuccessfulTransfer = 
localFlowActivityInfo.getLastSuccessfulTransfer();
+
+            if (hasSuccessfulTransfer() && (lastSuccessfulTransfer <= 
getLastSuccessfulTransfer())) {
+                return;
+            }
+
+            
newState.putAll(localFlowActivityInfo.getLastSuccessfulTransferAttributes());
+            newState.put(STATE_KEY_COMMON_FLOW_ACTIVITY_INFO, 
String.valueOf(lastSuccessfulTransfer));
+
+            final boolean wasSuccessful;
+            try {
+                wasSuccessful = stateManager.replace(storedState, newState, 
Scope.CLUSTER);
+            } catch (IOException e) {
+                throw new SaveSharedFlowStateException("Caught exception while 
saving state.", e);
+            }
+
+            if (!wasSuccessful) {
+                throw new SaveSharedFlowStateException("Failed to save state. 
Probably there was a concurrent update.");
+            }
+        }
+    }
+
+    private static class SaveSharedFlowStateException extends ProcessException 
{
+        public SaveSharedFlowStateException(String message) {
+            super(message);
+        }
+
+        public SaveSharedFlowStateException(String message, Throwable cause) {
+            super(message, cause);
+        }
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
index a38ca868b0..da5c288005 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
@@ -16,15 +16,23 @@
  */
 package org.apache.nifi.processors.standard;
 
+import static java.lang.System.currentTimeMillis;
+import static java.util.Collections.singletonMap;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
 import java.io.IOException;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
-
 import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.util.MockFlowFile;
@@ -33,16 +41,10 @@ import org.apache.nifi.util.TestRunners;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
 public class TestMonitorActivity {
 
     @Test
-    public void testFirstMessage() {
+    public void testFirstMessage() throws InterruptedException {
         final TestableProcessor processor = new TestableProcessor(1000);
         final TestRunner runner = TestRunners.newTestRunner(processor);
         runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
@@ -53,7 +55,7 @@ public class TestMonitorActivity {
         runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS, 1);
         runner.clearTransferState();
 
-        processor.resetLastSuccessfulTransfer();
+        TimeUnit.MILLISECONDS.sleep(200);
 
         runNext(runner);
         runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE, 1);
@@ -78,7 +80,7 @@ public class TestMonitorActivity {
         runner.clearTransferState();
         runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "true");
 
-        processor.resetLastSuccessfulTransfer();
+        TimeUnit.MILLISECONDS.sleep(200);
         runNext(runner);
 
         runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
@@ -101,19 +103,24 @@ public class TestMonitorActivity {
     }
 
     @Test
-    public void testFirstMessageWithWaitForActivityTrue() {
+    public void testFirstMessageWithWaitForActivityTrue() throws 
InterruptedException {
         final TestableProcessor processor = new TestableProcessor(1000);
         final TestRunner runner = TestRunners.newTestRunner(processor);
         runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
         runner.setProperty(MonitorActivity.THRESHOLD, "100 millis");
         runner.setProperty(MonitorActivity.WAIT_FOR_ACTIVITY, "true");
 
+        runner.run(1, false);
+        TimeUnit.MILLISECONDS.sleep(200);
+        runNext(runner);
+        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
+
         runner.enqueue(new byte[0]);
-        runner.run();
+        runNext(runner);
         runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS, 1);
         runner.clearTransferState();
 
-        processor.resetLastSuccessfulTransfer();
+        TimeUnit.MILLISECONDS.sleep(200);
 
         runNext(runner);
         runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE, 1);
@@ -136,7 +143,7 @@ public class TestMonitorActivity {
         runner.clearTransferState();
         runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "true");
 
-        processor.resetLastSuccessfulTransfer();
+        TimeUnit.MILLISECONDS.sleep(200);
         runNext(runner);
 
         runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
@@ -157,7 +164,7 @@ public class TestMonitorActivity {
     }
     @Test
     public void testReconcileAfterFirstStartWhenLastSuccessIsAlreadySet() 
throws Exception {
-        final String lastSuccessInCluster = 
String.valueOf(System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(5));
+        final String lastSuccessInCluster = String.valueOf(currentTimeMillis() 
- TimeUnit.MINUTES.toMillis(5));
         final TestRunner runner = TestRunners.newTestRunner(new 
TestableProcessor(0));
         runner.setIsConfiguredForClustering(true);
         runner.setPrimaryNode(true);
@@ -165,14 +172,175 @@ public class TestMonitorActivity {
         runner.setProperty(MonitorActivity.REPORTING_NODE, 
MonitorActivity.REPORT_NODE_PRIMARY);
         runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
         runner.setProperty(MonitorActivity.THRESHOLD, "5 secs");
-        
runner.getStateManager().setState(Collections.singletonMap(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER,
 lastSuccessInCluster), Scope.CLUSTER);
+        runner.getStateManager().setState(
+                
singletonMap(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO, 
lastSuccessInCluster), Scope.CLUSTER);
 
         runner.enqueue("lorem ipsum");
         runner.run(1, false);
 
         runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
         final StateMap updatedState = 
runner.getStateManager().getState(Scope.CLUSTER);
-        assertNotEquals(lastSuccessInCluster, 
updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+        assertNotEquals(lastSuccessInCluster, 
updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
+    }
+
+    @Test
+    public void 
testReconcileAfterFirstStartWhenLastSuccessIsAlreadySetAndNoInput() throws 
Exception {
+        final String lastSuccessInCluster = String.valueOf(currentTimeMillis() 
- TimeUnit.MINUTES.toMillis(5));
+        final TestRunner runner = TestRunners.newTestRunner(new 
TestableProcessor(0));
+        runner.setIsConfiguredForClustering(true);
+        runner.setPrimaryNode(true);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, 
MonitorActivity.SCOPE_CLUSTER);
+        runner.setProperty(MonitorActivity.REPORTING_NODE, 
MonitorActivity.REPORT_NODE_PRIMARY);
+        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
+        runner.setProperty(MonitorActivity.THRESHOLD, "5 secs");
+        runner.getStateManager().setState(
+                
singletonMap(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO, 
lastSuccessInCluster), Scope.CLUSTER);
+
+        runner.run(1, false);
+
+        final StateMap updatedState = 
runner.getStateManager().getState(Scope.CLUSTER);
+        assertEquals(lastSuccessInCluster, 
updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
+
+        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
+        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
+        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
+    }
+
+    @Test
+    public void 
testReconcileAfterFirstStartWhenLastSuccessIsAlreadySetAndNoInputButClusterIsActive()
 throws Exception {
+        final String lastSuccessInCluster = 
String.valueOf(currentTimeMillis());
+        final TestRunner runner = TestRunners.newTestRunner(new 
TestableProcessor(0));
+        runner.setIsConfiguredForClustering(true);
+        runner.setPrimaryNode(true);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, 
MonitorActivity.SCOPE_CLUSTER);
+        runner.setProperty(MonitorActivity.REPORTING_NODE, 
MonitorActivity.REPORT_NODE_PRIMARY);
+        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
+        runner.setProperty(MonitorActivity.THRESHOLD, "5 minutes");
+        runner.getStateManager().setState(
+                
singletonMap(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO, 
lastSuccessInCluster), Scope.CLUSTER);
+
+        runner.run(1, false);
+
+        final StateMap updatedState = 
runner.getStateManager().getState(Scope.CLUSTER);
+        assertEquals(lastSuccessInCluster, 
updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
+
+        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
+        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
+        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
+    }
+
+    @Test
+    public void 
testReconcileAfterFirstStartWhenLastSuccessIsAlreadySetAndNoInputAndWasInactiveLastTime()
 throws Exception {
+        final String lastSuccessInCluster = String.valueOf(currentTimeMillis() 
- TimeUnit.MINUTES.toMillis(5));
+        final TestRunner runner = TestRunners.newTestRunner(new 
TestableProcessor(0));
+        runner.setIsConfiguredForClustering(true);
+        runner.setPrimaryNode(true);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, 
MonitorActivity.SCOPE_CLUSTER);
+        runner.setProperty(MonitorActivity.REPORTING_NODE, 
MonitorActivity.REPORT_NODE_PRIMARY);
+        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
+        runner.setProperty(MonitorActivity.THRESHOLD, "5 secs");
+        runner.getStateManager().setState(
+                
singletonMap(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO, 
lastSuccessInCluster), Scope.CLUSTER);
+
+        runner.setProperty(MonitorActivity.RESET_STATE_ON_RESTART, 
Boolean.FALSE.toString());
+        runner.getStateManager().setState(
+                
singletonMap(MonitorActivity.STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO, 
lastSuccessInCluster), Scope.LOCAL
+        );
+
+        runner.run(1, false);
+
+        final StateMap updatedState = 
runner.getStateManager().getState(Scope.CLUSTER);
+        assertEquals(lastSuccessInCluster, 
updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
+        final StateMap updatedLocalState = 
runner.getStateManager().getState(Scope.LOCAL);
+        assertEquals(lastSuccessInCluster, 
updatedLocalState.get(MonitorActivity.STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO));
+
+        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
+        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
+        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
+    }
+
+    @Test
+    public void 
testReconcileAfterFirstStartWhenLastSuccessIsAlreadySetAndNoInputAndWasActiveLastTime()
 throws Exception {
+        final String lastSuccessInCluster = 
String.valueOf(currentTimeMillis());
+        final TestRunner runner = TestRunners.newTestRunner(new 
TestableProcessor(0));
+        runner.setIsConfiguredForClustering(true);
+        runner.setPrimaryNode(true);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, 
MonitorActivity.SCOPE_CLUSTER);
+        runner.setProperty(MonitorActivity.REPORTING_NODE, 
MonitorActivity.REPORT_NODE_PRIMARY);
+        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
+        runner.setProperty(MonitorActivity.THRESHOLD, "5 minutes");
+        runner.getStateManager().setState(
+                
singletonMap(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO, 
lastSuccessInCluster), Scope.CLUSTER);
+
+        runner.setProperty(MonitorActivity.RESET_STATE_ON_RESTART, 
Boolean.FALSE.toString());
+        // if was active, there is no local state
+
+        runner.run(1, false);
+
+        final StateMap updatedState = 
runner.getStateManager().getState(Scope.CLUSTER);
+        assertEquals(lastSuccessInCluster, 
updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
+
+        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
+        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
+        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
+    }
+
+    @Test
+    public void 
testReconcileAfterFirstStartWhenLastSuccessIsAlreadySetAndNoInputAndTurnedInactiveSinceLastTime()
 throws Exception {
+        final String lastSuccessInCluster = String.valueOf(currentTimeMillis() 
- TimeUnit.MINUTES.toMillis(5));
+        final TestRunner runner = TestRunners.newTestRunner(new 
TestableProcessor(0));
+        runner.setIsConfiguredForClustering(true);
+        runner.setPrimaryNode(true);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, 
MonitorActivity.SCOPE_CLUSTER);
+        runner.setProperty(MonitorActivity.REPORTING_NODE, 
MonitorActivity.REPORT_NODE_PRIMARY);
+        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
+        runner.setProperty(MonitorActivity.THRESHOLD, "5 secs");
+        runner.getStateManager().setState(
+                
singletonMap(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO, 
lastSuccessInCluster), Scope.CLUSTER);
+
+        runner.setProperty(MonitorActivity.RESET_STATE_ON_RESTART, 
Boolean.FALSE.toString());
+        // if was active, there is no local state
+
+        runner.run(1, false);
+
+        final StateMap updatedClusterState = 
runner.getStateManager().getState(Scope.CLUSTER);
+        assertEquals(lastSuccessInCluster, 
updatedClusterState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
+        final StateMap updatedLocalState = 
runner.getStateManager().getState(Scope.LOCAL);
+        assertEquals(lastSuccessInCluster, 
updatedLocalState.get(MonitorActivity.STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO));
+
+        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
+        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
+        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
+    }
+
+    @Test
+    public void 
testReconcileAfterFirstStartWhenLastSuccessIsAlreadySetAndNoInputAndTurnedActiveSinceLastTime()
 throws Exception {
+        final String lastSuccessInCluster = 
String.valueOf(currentTimeMillis());
+        final String lastSuccessInLocal = String.valueOf(currentTimeMillis() - 
TimeUnit.MINUTES.toMillis(5));
+        final TestRunner runner = TestRunners.newTestRunner(new 
TestableProcessor(0));
+        runner.setIsConfiguredForClustering(true);
+        runner.setPrimaryNode(true);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, 
MonitorActivity.SCOPE_CLUSTER);
+        runner.setProperty(MonitorActivity.REPORTING_NODE, 
MonitorActivity.REPORT_NODE_PRIMARY);
+        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
+        runner.setProperty(MonitorActivity.THRESHOLD, "5 secs");
+        runner.getStateManager().setState(
+                
singletonMap(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO, 
lastSuccessInCluster), Scope.CLUSTER);
+
+        runner.setProperty(MonitorActivity.RESET_STATE_ON_RESTART, 
Boolean.FALSE.toString());
+        runner.getStateManager().setState(
+                
singletonMap(MonitorActivity.STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO, 
lastSuccessInLocal), Scope.LOCAL
+        );
+
+        runner.run(1, false);
+
+        final StateMap updatedState = 
runner.getStateManager().getState(Scope.CLUSTER);
+        assertEquals(lastSuccessInCluster, 
updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
+        
assertNull(runner.getStateManager().getState(Scope.LOCAL).get(MonitorActivity.STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO));
+
+        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
+        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
+        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
     }
 
     @Test
@@ -188,33 +356,41 @@ public class TestMonitorActivity {
 
         runner.setConnected(false);
         runner.enqueue("lorem ipsum");
-        runner.run(1, false, false);
+        runner.run(1, false);
 
         runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
 
         runner.setConnected(true);
-        runner.run(1, false, false);
+        runNext(runner);
 
-        final long tLocal = processor.getLatestSuccessTransfer();
+        final long tLocal = processor.getLastSuccessfulTransfer();
         final long tCluster = getLastSuccessFromCluster(runner);
         assertEquals(tLocal, tCluster);
     }
 
     @Test
-    public void testReconcileAfterReconnectWhenPrimary() throws 
InterruptedException {
+    public void testReconcileAfterReconnectWhenPrimary() throws 
InterruptedException, IOException {
         final TestRunner runner = getRunnerScopeCluster(new MonitorActivity(), 
true);
+        final StateManager stateManager = runner.getStateManager();
 
         // First trigger will write last success transfer into cluster.
         runner.enqueue("lorem ipsum");
-        runNext(runner);
+        runner.run(1, false);
+
+        final String lastSuccessTransferAfterFirstTrigger = 
stateManager.getState(Scope.CLUSTER)
+                .get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO);
 
         assertTransferCountSuccessInactiveRestored(runner, 1, 0);
 
         // At second trigger it's not connected, new last success transfer 
stored only locally.
         runner.setConnected(false);
         runner.enqueue("lorem ipsum");
+        TimeUnit.MILLISECONDS.sleep(500); // This sleep is needed to 
guarantee, that the stored timestamp will be different.
         runNext(runner);
 
+        assertEquals(lastSuccessTransferAfterFirstTrigger,
+                
stateManager.getState(Scope.CLUSTER).get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
+
         assertTransferCountSuccessInactiveRestored(runner, 2, 0);
 
         // The third trigger is without flow file, but reconcile is triggered 
and value is written ot cluster.
@@ -222,32 +398,46 @@ public class TestMonitorActivity {
         TimeUnit.MILLISECONDS.sleep(500);
         runNext(runner);
 
+        assertNotEquals(lastSuccessTransferAfterFirstTrigger,
+                
stateManager.getState(Scope.CLUSTER).get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
+
         // Inactive message is being sent after the connection is back.
         assertTransferCountSuccessInactiveRestored(runner,2, 1);
     }
 
     @Test
-    public void testReconcileAfterReconnectWhenNotPrimary() {
+    public void testReconcileAfterReconnectWhenNotPrimary() throws 
IOException, InterruptedException {
         final TestableProcessor processor = new TestableProcessor(1000);
         final TestRunner runner = getRunnerScopeCluster(processor, false);
+        final StateManager stateManager = runner.getStateManager();
 
         // First trigger will write last success transfer into cluster.
         runner.enqueue("lorem ipsum");
-        runNext(runner);
+        runner.run(1, false);
+
+        final String lastSuccessTransferAfterFirstTrigger = 
stateManager.getState(Scope.CLUSTER)
+                .get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO);
 
         assertTransferCountSuccessInactiveRestored(runner, 1, 0);
 
         // At second trigger it's not connected, new last success transfer 
stored only locally.
         runner.setConnected(false);
         runner.enqueue("lorem ipsum");
+        TimeUnit.MILLISECONDS.sleep(500); // This sleep is needed to 
guarantee, that the stored timestamp will be different.
         runNext(runner);
 
+        assertEquals(lastSuccessTransferAfterFirstTrigger,
+                
stateManager.getState(Scope.CLUSTER).get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
+
         assertTransferCountSuccessInactiveRestored(runner, 2, 0);
 
         // The third trigger is without flow file, but reconcile is triggered 
and value is written ot cluster.
         runner.setConnected(true);
         runNext(runner);
 
+        assertNotEquals(lastSuccessTransferAfterFirstTrigger,
+                
stateManager.getState(Scope.CLUSTER).get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
+
         // No inactive message because of the node is not primary
         assertTransferCountSuccessInactiveRestored(runner, 2, 0);
     }
@@ -269,7 +459,7 @@ public class TestMonitorActivity {
     }
 
     private Long getLastSuccessFromCluster(final TestRunner runner) throws 
IOException {
-        return 
Long.valueOf(runner.getStateManager().getState(Scope.CLUSTER).get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+        return 
Long.valueOf(runner.getStateManager().getState(Scope.CLUSTER).get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
     }
 
     private void assertTransferCountSuccessInactiveRestored(TestRunner runner, 
final int success, final int inactive) {
@@ -303,7 +493,7 @@ public class TestMonitorActivity {
     }
 
     @Test
-    public void testFirstMessageWithInherit() {
+    public void testFirstMessageWithInherit() throws InterruptedException {
         final TestableProcessor processor = new TestableProcessor(1000);
         final TestRunner runner = TestRunners.newTestRunner(processor);
         runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
@@ -311,12 +501,12 @@ public class TestMonitorActivity {
         runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
 
         runner.enqueue(new byte[0]);
-        runner.run();
+        runner.run(1, false);
         runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS, 1);
         MockFlowFile originalFlowFile = 
runner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS).get(0);
         runner.clearTransferState();
 
-        processor.resetLastSuccessfulTransfer();
+        TimeUnit.MILLISECONDS.sleep(200);
 
         runNext(runner);
         runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE, 1);
@@ -346,7 +536,7 @@ public class TestMonitorActivity {
         runner.clearTransferState();
         runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "true");
 
-        processor.resetLastSuccessfulTransfer();
+        TimeUnit.MILLISECONDS.sleep(200);
         runNext(runner);
 
         runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
@@ -383,7 +573,7 @@ public class TestMonitorActivity {
             rerun = false;
             runner.setProperty(MonitorActivity.THRESHOLD, threshold + " 
millis");
 
-            Thread.sleep(1000L);
+            TimeUnit.MILLISECONDS.sleep(1000L);
 
             // shouldn't generate inactivity b/c run() will reset the 
lastSuccessfulTransfer if @OnSchedule & onTrigger
             // does not  get called more than MonitorActivity.THRESHOLD apart
@@ -410,30 +600,18 @@ public class TestMonitorActivity {
      */
     private static class TestableProcessor extends MonitorActivity {
 
-        private final long timestampDifference;
+        private final long startupTime;
 
         public TestableProcessor(final long timestampDifference) {
-            this.timestampDifference = timestampDifference;
+            this.startupTime = currentTimeMillis() - timestampDifference;
         }
 
         @Override
-        public void resetLastSuccessfulTransfer() {
-            setLastSuccessfulTransfer(System.currentTimeMillis() - 
timestampDifference);
+        protected long getStartupTime() {
+            return startupTime;
         }
     }
 
-    @Test
-    public void testClusterMonitorInvalidReportingNode() {
-        final TestRunner runner = TestRunners.newTestRunner(new 
TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
-
-        runner.setIsConfiguredForClustering(true);
-        runner.setPrimaryNode(false);
-        runner.setProperty(MonitorActivity.MONITORING_SCOPE, 
MonitorActivity.SCOPE_NODE);
-        runner.setProperty(MonitorActivity.REPORTING_NODE, 
MonitorActivity.REPORT_NODE_PRIMARY);
-
-        runner.assertNotValid();
-    }
-
     @Test
     public void testClusterMonitorActive() throws Exception {
         final TestRunner runner = TestRunners.newTestRunner(new 
TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
@@ -445,12 +623,12 @@ public class TestMonitorActivity {
 
         runner.enqueue("Incoming data");
 
-        runner.run();
+        runner.run(1, false);
 
         runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
 
         final StateMap updatedState = 
runner.getStateManager().getState(Scope.CLUSTER);
-        
assertNotNull(updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+        
assertNotNull(updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
         // Should be null because COPY_ATTRIBUTES is null.
         assertNull(updatedState.get("key1"));
         assertNull(updatedState.get("key2"));
@@ -472,7 +650,7 @@ public class TestMonitorActivity {
         runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
 
         final StateMap updatedState = 
runner.getStateManager().getState(Scope.CLUSTER);
-        
assertNull(updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+        
assertNull(updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
     }
 
     @Test
@@ -488,21 +666,22 @@ public class TestMonitorActivity {
 
         // Set future timestamp in state
         final HashMap<String, String> existingState = new HashMap<>();
-        final long existingTimestamp = System.currentTimeMillis() - 1_000;
-        existingState.put(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER,
+        final long existingTimestamp = currentTimeMillis() - 1_000;
+        existingState.put(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO,
                 String.valueOf(existingTimestamp));
         existingState.put("key1", "value1");
         existingState.put("key2", "value2");
         runner.getStateManager().setState(existingState, Scope.CLUSTER);
         
runner.getStateManager().replace(runner.getStateManager().getState(Scope.CLUSTER),
 existingState, Scope.CLUSTER);
 
-        runner.run();
+        runner.run(1, false);
 
         runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
 
         final StateMap postProcessedState = 
runner.getStateManager().getState(Scope.CLUSTER);
-        assertTrue(                existingTimestamp < 
Long.parseLong(postProcessedState.get(
-                MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER)));
+        long postProcessedTimestamp = Long.parseLong(postProcessedState.get(
+                MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
+        assertTrue(existingTimestamp < postProcessedTimestamp);
         // State should be updated. Null in this case.
         assertNull(postProcessedState.get("key1"));
         assertNull(postProcessedState.get("key2"));
@@ -521,22 +700,22 @@ public class TestMonitorActivity {
 
         // Set future timestamp in state
         final HashMap<String, String> existingState = new HashMap<>();
-        final long existingTimestamp = System.currentTimeMillis() + 10_000;
-        existingState.put(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER,
+        final long existingTimestamp = currentTimeMillis() + 10_000;
+        existingState.put(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO,
                 String.valueOf(existingTimestamp));
         existingState.put("key1", "value1");
         existingState.put("key2", "value2");
         runner.getStateManager().setState(existingState, Scope.CLUSTER);
         
runner.getStateManager().replace(runner.getStateManager().getState(Scope.CLUSTER),
 existingState, Scope.CLUSTER);
 
-        runner.run();
+        runner.run(1, false);
 
         runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
 
         final StateMap postProcessedState = 
runner.getStateManager().getState(Scope.CLUSTER);
         assertEquals(
                 String.valueOf(existingTimestamp),
-                
postProcessedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+                
postProcessedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
         // State should stay the same.
         assertEquals(postProcessedState.get("key1"), 
existingState.get("key1"));
         assertEquals(postProcessedState.get("key2"), 
existingState.get("key2"));
@@ -557,12 +736,12 @@ public class TestMonitorActivity {
         attributes.put("key2", "value2");
         runner.enqueue("Incoming data", attributes);
 
-        runner.run();
+        runner.run(1, false);
 
         runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
 
         final StateMap updatedState = 
runner.getStateManager().getState(Scope.CLUSTER);
-        
assertNotNull(updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+        
assertNotNull(updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
         assertEquals("value1", updatedState.get("key1"));
         assertEquals("value2", updatedState.get("key2"));
     }
@@ -689,7 +868,7 @@ public class TestMonitorActivity {
 
         // Latest activity should be persisted
         final StateMap updatedState = 
runner.getStateManager().getState(Scope.CLUSTER);
-        
assertNotNull(updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+        
assertNotNull(updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
         assertEquals("value1", updatedState.get("key1"));
         assertEquals("value2", updatedState.get("key2"));
         runner.clearTransferState();
@@ -723,7 +902,7 @@ public class TestMonitorActivity {
 
         // Latest activity should be persisted
         final StateMap updatedState = 
runner.getStateManager().getState(Scope.CLUSTER);
-        
assertNotNull(updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+        
assertNotNull(updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
         assertEquals("value1", updatedState.get("key1"));
         assertEquals("value2", updatedState.get("key2"));
         runner.clearTransferState();
@@ -762,7 +941,7 @@ public class TestMonitorActivity {
 
         // Latest activity should be persisted
         final StateMap updatedState = 
runner.getStateManager().getState(Scope.CLUSTER);
-        
assertNotNull(updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+        
assertNotNull(updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
         assertEquals("value1", updatedState.get("key1"));
         assertEquals("value2", updatedState.get("key2"));
         runner.clearTransferState();
@@ -801,7 +980,7 @@ public class TestMonitorActivity {
 
         // Latest activity should NOT be persisted
         final StateMap updatedState = 
runner.getStateManager().getState(Scope.CLUSTER);
-        
assertNull(updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+        
assertNull(updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
         runner.clearTransferState();
     }
 
@@ -812,22 +991,25 @@ public class TestMonitorActivity {
         runner.setIsConfiguredForClustering(true);
         runner.setPrimaryNode(false);
         runner.setProperty(MonitorActivity.MONITORING_SCOPE, 
MonitorActivity.SCOPE_CLUSTER);
-        runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
+        runner.setProperty(MonitorActivity.THRESHOLD, "10 sec");
         runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
 
         // Becomes inactive
-        runner.run();
+        runner.run(1, false);
         runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
         runner.clearTransferState();
 
         // Activity restored, even if this node doesn't have activity, other 
node updated the cluster state.
         final HashMap<String, String> clusterState = new HashMap<>();
-        clusterState.put(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER, 
String.valueOf(System.currentTimeMillis()));
+        clusterState.put(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO, 
String.valueOf(currentTimeMillis()));
         clusterState.put("key1", "value1");
         clusterState.put("key2", "value2");
         runner.getStateManager().setState(clusterState, Scope.CLUSTER);
         
runner.getStateManager().replace(runner.getStateManager().getState(Scope.CLUSTER),
 clusterState, Scope.CLUSTER);
 
+        // Common state is not sampled on each trigger. We need to wait a 
little to get notified about the update.
+        TimeUnit.MILLISECONDS.sleep(3334); // Sampling rate is threshold/3
+
         runNext(runner);
         final List<MockFlowFile> successFiles = 
runner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS);
         final List<MockFlowFile> activityRestoredFiles = 
runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED);
@@ -849,22 +1031,25 @@ public class TestMonitorActivity {
         runner.setPrimaryNode(true);
         runner.setProperty(MonitorActivity.MONITORING_SCOPE, 
MonitorActivity.SCOPE_CLUSTER);
         runner.setProperty(MonitorActivity.REPORTING_NODE, 
MonitorActivity.REPORT_NODE_PRIMARY);
-        runner.setProperty(MonitorActivity.THRESHOLD, "1 hour");
+        runner.setProperty(MonitorActivity.THRESHOLD, "10 sec");
         runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
 
         // Becomes inactive
-        runner.run();
+        runner.run(1, false);
         runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
         runner.clearTransferState();
 
         // Activity restored, even if this node doesn't have activity, other 
node updated the cluster state.
         final HashMap<String, String> clusterState = new HashMap<>();
-        clusterState.put(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER, 
String.valueOf(System.currentTimeMillis()));
+        clusterState.put(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO, 
String.valueOf(currentTimeMillis()));
         clusterState.put("key1", "value1");
         clusterState.put("key2", "value2");
         runner.getStateManager().setState(clusterState, Scope.CLUSTER);
         
runner.getStateManager().replace(runner.getStateManager().getState(Scope.CLUSTER),
 clusterState, Scope.CLUSTER);
 
+        // Common state is not sampled on each trigger. We need to wait a 
little to get notified about the update.
+        TimeUnit.MILLISECONDS.sleep(3334); // Sampling rate is threshold/3
+
         runNext(runner);
         final List<MockFlowFile> successFiles = 
runner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS);
         final List<MockFlowFile> activityRestoredFiles = 
runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED);
@@ -892,7 +1077,7 @@ public class TestMonitorActivity {
 
         // Activity restored, even if this node doesn't have activity, other 
node updated the cluster state.
         final HashMap<String, String> clusterState = new HashMap<>();
-        clusterState.put(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER, 
String.valueOf(System.currentTimeMillis()));
+        clusterState.put(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO, 
String.valueOf(currentTimeMillis()));
         clusterState.put("key1", "value1");
         clusterState.put("key2", "value2");
         runner.getStateManager().setState(clusterState, Scope.CLUSTER);
@@ -906,4 +1091,288 @@ public class TestMonitorActivity {
 
     }
 
-}
\ No newline at end of file
+    @Test
+    public void testDisconnectedNodeActivatesFlow() {
+        final TestRunner runner = TestRunners.newTestRunner(new 
TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
+        runner.setIsConfiguredForClustering(true);
+        runner.setConnected(true);
+        runner.setPrimaryNode(false);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, 
MonitorActivity.SCOPE_CLUSTER);
+        runner.setProperty(MonitorActivity.REPORTING_NODE, 
MonitorActivity.REPORT_NODE_ALL);
+        runner.setProperty(MonitorActivity.THRESHOLD, "1 min");
+
+        // Becomes inactive
+        runner.run(1, false);
+        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
+        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
+        runner.clearTransferState();
+
+        // Disconnect the node, and feed in a flow file
+        runner.setConnected(false);
+        runner.enqueue("Incoming data");
+        runNext(runner);
+
+        // We expect both activation marker, and forwarded FF
+        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1);
+        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
+    }
+
+    @Test
+    public void testDisconnectedNodeDeactivatesFlowOnlyWhenConnected() {
+        final TestRunner runner = TestRunners.newTestRunner(new 
TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
+        runner.setIsConfiguredForClustering(true);
+        runner.setConnected(false);
+        runner.setPrimaryNode(false);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, 
MonitorActivity.SCOPE_CLUSTER);
+        runner.setProperty(MonitorActivity.REPORTING_NODE, 
MonitorActivity.REPORT_NODE_ALL);
+        runner.setProperty(MonitorActivity.THRESHOLD, "1 min");
+
+        // Becomes inactive
+        runner.run(1, false);
+        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
+        runner.clearTransferState();
+
+        // Disconnect the node, and expect marker
+        runner.setConnected(true);
+        runNext(runner);
+        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
+        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
+    }
+
+    @Test
+    public void testLocalStateIsNotDeletedInStandaloneCaseWhenStopped() throws 
IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
+        runner.setIsConfiguredForClustering(false);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, 
MonitorActivity.SCOPE_NODE);
+        runner.setProperty(MonitorActivity.REPORTING_NODE, 
MonitorActivity.REPORT_NODE_ALL);
+        runner.setProperty(MonitorActivity.THRESHOLD, "1 min");
+
+        // Becomes inactive
+        runner.run(1, false);
+        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
+        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
+        
assertFalse(runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty());
+
+        // Stop the processor and expect the local state still there
+        runner.stop();
+        
assertFalse(runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty());
+    }
+
+    @Test
+    public void 
testLocalStateIsNotDeletedInClusteredCaseNodeScopeWhenStopped() throws 
IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
+        runner.setIsConfiguredForClustering(true);
+        runner.setConnected(true);
+        runner.setPrimaryNode(true);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, 
MonitorActivity.SCOPE_NODE);
+        runner.setProperty(MonitorActivity.REPORTING_NODE, 
MonitorActivity.REPORT_NODE_ALL);
+        runner.setProperty(MonitorActivity.THRESHOLD, "1 min");
+
+        // Becomes inactive
+        runner.run(1, false);
+        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
+        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
+        
assertFalse(runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty());
+
+        // Stop the processor and expect the local state still there
+        runner.stop();
+        
assertFalse(runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty());
+    }
+
+    @Test
+    public void 
testLocalStateIsNotDeletedInClusteredCaseClusterScopeWhenStopped() throws 
IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
+        runner.setIsConfiguredForClustering(true);
+        runner.setConnected(true);
+        runner.setPrimaryNode(true);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, 
MonitorActivity.SCOPE_CLUSTER);
+        runner.setProperty(MonitorActivity.REPORTING_NODE, 
MonitorActivity.REPORT_NODE_ALL);
+        runner.setProperty(MonitorActivity.THRESHOLD, "1 min");
+
+        // Becomes inactive
+        runner.run(1, false);
+        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
+        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
+        
assertFalse(runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty());
+
+        // Stop the processor and expect the local state still there
+        runner.stop();
+        
assertFalse(runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty());
+    }
+
+    @Test
+    public void 
testLocalStateIsNotDeletedInClusteredCaseWhenDisconnectedAndStopped() throws 
IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
+        runner.setIsConfiguredForClustering(true);
+        runner.setConnected(true);
+        runner.setPrimaryNode(true);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, 
MonitorActivity.SCOPE_CLUSTER);
+        runner.setProperty(MonitorActivity.REPORTING_NODE, 
MonitorActivity.REPORT_NODE_ALL);
+        runner.setProperty(MonitorActivity.THRESHOLD, "1 min");
+
+        // Becomes inactive
+        runner.run(1, false);
+        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
+        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
+        
assertFalse(runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty());
+
+        // Disconnect the node & stop the processor and expect the local state 
still there
+        runner.setConnected(false);
+        runner.stop();
+        
assertFalse(runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty());
+    }
+
+    @Test
+    public void 
testActivationMarkerIsImmediateWhenAnyOtherNodeActivatesTheFlow() throws 
IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
TestableProcessor(TimeUnit.DAYS.toMillis(1)));
+        runner.setIsConfiguredForClustering(true);
+        runner.setConnected(true);
+        runner.setPrimaryNode(true);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, 
MonitorActivity.SCOPE_CLUSTER);
+        runner.setProperty(MonitorActivity.REPORTING_NODE, 
MonitorActivity.REPORT_NODE_PRIMARY);
+        runner.setProperty(MonitorActivity.THRESHOLD, "3 hours");
+
+        // Becomes inactive
+        runner.run(1, false);
+        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
+        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
+        runner.clearTransferState();
+
+        // Update the cluster state
+        
runner.getStateManager().setState(singletonMap(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO,
 String.valueOf(currentTimeMillis())), Scope.CLUSTER);
+        runNext(runner);
+
+        // We expect activation marker only
+        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
+        
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_ACTIVITY_RESTORED);
+    }
+
+    @Test
+    public void 
testDisconnectNodeAndActivateBothTheOtherNodesAndTheDisconnectedNodeIndependently()
 throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
TestableProcessor(TimeUnit.DAYS.toMillis(1)));
+        runner.setIsConfiguredForClustering(true);
+        runner.setConnected(true);
+        runner.setPrimaryNode(false);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, 
MonitorActivity.SCOPE_CLUSTER);
+        runner.setProperty(MonitorActivity.REPORTING_NODE, 
MonitorActivity.REPORT_NODE_ALL);
+        runner.setProperty(MonitorActivity.THRESHOLD, "3 hours");
+
+        // Becomes inactive
+        runner.run(1, false);
+        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
+        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
+        runner.clearTransferState();
+
+        // Disconnect the node, and feed in a flow file to activate it
+        runner.setConnected(false);
+        runner.enqueue("Incoming data");
+        runNext(runner);
+
+        // We expect both activation marker, and forwarded FF
+        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1);
+        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
+        runner.clearTransferState();
+
+        // Update the cluster state too, and reconnect. This simulates other 
nodes being activated.
+        
runner.getStateManager().setState(singletonMap(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO,
 String.valueOf(currentTimeMillis())), Scope.CLUSTER);
+        runner.setConnected(true);
+        runNext(runner);
+
+        // We expect no output
+        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
+        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
+        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
+    }
+
+    @Test
+    public void testClusterStateIsImmediatelyUpdatedOnActivation() throws 
IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
TestableProcessor(TimeUnit.DAYS.toMillis(1)));
+        runner.setIsConfiguredForClustering(true);
+        runner.setConnected(true);
+        runner.setPrimaryNode(false);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, 
MonitorActivity.SCOPE_CLUSTER);
+        runner.setProperty(MonitorActivity.REPORTING_NODE, 
MonitorActivity.REPORT_NODE_ALL);
+        runner.setProperty(MonitorActivity.THRESHOLD, "3 hours");
+
+        // Becomes inactive
+        runner.run(1, false);
+        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
+        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
+        runner.clearTransferState();
+
+        
assertNull(runner.getStateManager().getState(Scope.CLUSTER).get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
+        runner.enqueue("Incoming data");
+        runNext(runner);
+        
assertNotNull(runner.getStateManager().getState(Scope.CLUSTER).get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
+    }
+
+    @Test
+    public void testResetStateOnStartupByDefault() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
MonitorActivity());
+        runner.setIsConfiguredForClustering(false);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, 
MonitorActivity.SCOPE_NODE);
+        runner.setProperty(MonitorActivity.THRESHOLD, "24 hours");
+
+        runner.getStateManager().setState(
+                singletonMap(
+                        MonitorActivity.STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO,
+                        String.valueOf(currentTimeMillis() - 
TimeUnit.DAYS.toMillis(1))
+                ),
+                Scope.LOCAL
+        );
+
+        runner.enqueue("Incoming data");
+        runner.run();
+
+        // We expect only the FF as output
+        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
+    }
+
+    @Test
+    public void testResetStateOnStartupDisabled() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
MonitorActivity());
+        runner.setIsConfiguredForClustering(false);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, 
MonitorActivity.SCOPE_NODE);
+        runner.setProperty(MonitorActivity.THRESHOLD, "24 hours");
+        runner.setProperty(MonitorActivity.RESET_STATE_ON_RESTART, 
Boolean.FALSE.toString());
+
+        runner.getStateManager().setState(
+                singletonMap(
+                        MonitorActivity.STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO,
+                        String.valueOf(currentTimeMillis() - 
TimeUnit.DAYS.toMillis(1))
+                ),
+                Scope.LOCAL
+        );
+
+        runner.enqueue("Incoming data");
+        runner.run();
+
+        // We expect only the FF as output
+        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1);
+        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
+    }
+
+    @Test
+    public void 
testMultipleFlowFilesActivateTheFlowInSingleTriggerResultsInSingleMarker() 
throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
TestableProcessor(TimeUnit.DAYS.toMillis(1)));
+        runner.setIsConfiguredForClustering(false);
+        runner.setProperty(MonitorActivity.MONITORING_SCOPE, 
MonitorActivity.SCOPE_NODE);
+        runner.setProperty(MonitorActivity.THRESHOLD, "3 hours");
+
+        // Becomes inactive
+        runner.run(1, false);
+        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
+        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
+        runner.clearTransferState();
+
+        // Adding flow files
+        runner.enqueue("Incoming data 1");
+        runner.enqueue("Incoming data 2");
+        runner.enqueue("Incoming data 3");
+        runNext(runner);
+
+        // We expect only the FF as output
+        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 3);
+        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
+    }
+}


Reply via email to