turcsanyip commented on code in PR #8669:
URL: https://github.com/apache/nifi/pull/8669#discussion_r1592885962


##########
nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java:
##########
@@ -447,25 +457,202 @@ private boolean isReconnectedToCluster( final boolean 
isConnectedToCluster) {
         return !connectedWhenLastTriggered.get() && isConnectedToCluster;
     }
 
-    private void reconcileState(final ProcessContext context)  {
-        try {
-            final StateMap state = 
context.getStateManager().getState(Scope.CLUSTER);
-            final Map<String, String> newState = new HashMap<>();
-            newState.putAll(state.toMap());
+    private boolean shouldThisNodeReport(final boolean isClusterScope, final 
boolean isReportOnlyOnPrimary, final ProcessContext context) {
+        return !isClusterScope || ((!isReportOnlyOnPrimary || 
getNodeTypeProvider().isPrimary()) && context.isConnectedToCluster());
+    }
+
+    private void sendInactivityMarker(ProcessContext context, ProcessSession 
session, long inactivityStartMillis,
+            ComponentLog logger) {
+        FlowFile inactiveFlowFile = session.create();
+        inactiveFlowFile = session.putAttribute(
+                inactiveFlowFile,
+                "inactivityStartMillis", String.valueOf(inactivityStartMillis)
+        );
+        inactiveFlowFile = session.putAttribute(
+                inactiveFlowFile,
+                "inactivityDurationMillis",
+                String.valueOf(System.currentTimeMillis() - 
inactivityStartMillis)
+        );
+
+        final byte[] outBytes = 
context.getProperty(INACTIVITY_MESSAGE).evaluateAttributeExpressions(inactiveFlowFile).getValue().getBytes(
+                StandardCharsets.UTF_8);
+        inactiveFlowFile = session.write(inactiveFlowFile, out -> 
out.write(outBytes));
+
+        session.getProvenanceReporter().create(inactiveFlowFile);
+        session.transfer(inactiveFlowFile, REL_INACTIVE);
+        logger.info("Transferred {} to 'inactive'", inactiveFlowFile);
+    }
 
-            final long validLastSuccessTransfer = 
StringUtils.isEmpty(state.get(STATE_KEY_LATEST_SUCCESS_TRANSFER))
-                    ? latestSuccessTransfer.get()
-                    : 
Math.max(Long.valueOf(state.get(STATE_KEY_LATEST_SUCCESS_TRANSFER)), 
latestSuccessTransfer.get());
+    private void sendActivationMarker(ProcessContext context, ProcessSession 
session, Map<String, String> attributes,
+            long inactivityStartMillis, ComponentLog logger) {
+        FlowFile activityRestoredFlowFile = session.create();
+        // don't copy the UUID
+        attributes.remove(CoreAttributes.UUID.key());
+        activityRestoredFlowFile = 
session.putAllAttributes(activityRestoredFlowFile, attributes);
 
-            newState.put(STATE_KEY_LATEST_SUCCESS_TRANSFER, 
String.valueOf(validLastSuccessTransfer));
-            context.getStateManager().replace(state, newState, Scope.CLUSTER);
-        } catch (IOException e) {
-            getLogger().error("Could not reconcile state after (re)connection! 
Reason: " + e.getMessage());
-            throw new ProcessException(e);
+        activityRestoredFlowFile = 
session.putAttribute(activityRestoredFlowFile, "inactivityStartMillis", 
String.valueOf(
+                inactivityStartMillis));
+        activityRestoredFlowFile = 
session.putAttribute(activityRestoredFlowFile, "inactivityDurationMillis", 
String.valueOf(System.currentTimeMillis() - inactivityStartMillis));
+
+        final byte[] outBytes = 
context.getProperty(ACTIVITY_RESTORED_MESSAGE).evaluateAttributeExpressions(activityRestoredFlowFile).getValue().getBytes(
+                StandardCharsets.UTF_8);
+        activityRestoredFlowFile = session.write(activityRestoredFlowFile, out 
-> out.write(outBytes));
+
+        session.getProvenanceReporter().create(activityRestoredFlowFile);
+        session.transfer(activityRestoredFlowFile, REL_ACTIVITY_RESTORED);
+        logger.info("Transferred {} to 'activity.restored'", 
activityRestoredFlowFile);
+    }
+
+    private static class LocalFlowActivityInfo {
+        private static final long NO_VALUE = 0;
+
+        private final long startupTimeMillis;
+        private final long thresholdMillis;
+        private final boolean saveAttributes;
+
+        private long nextSyncMillis = NO_VALUE;
+        private long lastSuccessfulTransfer = NO_VALUE;
+        private Map<String, String> lastSuccessfulTransferAttributes = new 
HashMap<>();
+
+        public LocalFlowActivityInfo(long startupTimeMillis, long 
thresholdMillis, boolean saveAttributes) {
+            this.startupTimeMillis = startupTimeMillis;
+            this.thresholdMillis = thresholdMillis;
+            this.saveAttributes = saveAttributes;
+        }
+
+        public LocalFlowActivityInfo(long startupTimeMillis, long 
thresholdMillis, boolean saveAttributes, long initialLastSuccessfulTransfer) {
+            this(startupTimeMillis, thresholdMillis, saveAttributes);
+            lastSuccessfulTransfer = initialLastSuccessfulTransfer;
+        }
+
+        public synchronized boolean syncNeeded() {

Review Comment:
   The whole execution is single threaded due to `@TriggerSerially`. Therefore 
`synchronized` is not needed and can be removed.



##########
nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java:
##########
@@ -217,224 +230,221 @@ protected List<PropertyDescriptor> 
getSupportedPropertyDescriptors() {
     public void onScheduled(final ProcessContext context) {
         // Check configuration.
         isClusterScope(context, true);
-        resetLastSuccessfulTransfer();
-        inactive.set(false);
-        hasSuccessTransfer.set(false);
-    }
-
-
-    protected void resetLastSuccessfulTransfer() {
-        setLastSuccessfulTransfer(System.currentTimeMillis());
-    }
 
-    protected final void setLastSuccessfulTransfer(final long timestamp) {
-        latestSuccessTransfer.set(timestamp);
-        latestReportedNodeState.set(timestamp);
-    }
-
-    protected final long getLatestSuccessTransfer() {
-        return latestSuccessTransfer.get();
-    }
-
-    private boolean isClusterScope(final ProcessContext context, boolean 
logInvalidConfig) {
-        if 
(SCOPE_CLUSTER.equals(context.getProperty(MONITORING_SCOPE).getValue())) {
-            if (getNodeTypeProvider().isConfiguredForClustering()) {
-                return true;
-            }
-            if (logInvalidConfig) {
-                getLogger().warn("NiFi is running as a Standalone mode, but 
'cluster' scope is set." +
-                        " Fallback to 'node' scope. Fix configuration to stop 
this message.");
-            }
+        final long thresholdMillis = 
context.getProperty(THRESHOLD).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+        final boolean copyAttributes = 
context.getProperty(COPY_ATTRIBUTES).asBoolean();
+        final boolean resetStateOnRestart = 
context.getProperty(RESET_STATE_ON_RESTART).asBoolean();
+
+        // Attempt to load last state by the time of stopping this processor. 
A local state only exists if
+        // the monitored flow was already inactive, when the processor was 
shutting down.
+        final String storedLastSuccessfulTransfer = resetStateOnRestart ? null 
: tryLoadLastSuccessfulTransfer(context);
+
+        if (storedLastSuccessfulTransfer != null) {
+            // Initialize local flow as being inactive since the stored 
timestamp.
+            localFlowActivityInfo = new LocalFlowActivityInfo(
+                    getStartupTime(), thresholdMillis, copyAttributes, 
Long.parseLong(storedLastSuccessfulTransfer));
+            wasActive.set(localFlowActivityInfo.isActive());
+            inactivityStartMillis.set(localFlowActivityInfo.getLastActivity());
+        } else {
+            // Initialize local flow as being active. If there is no traffic, 
then it will eventually become inactive.
+            localFlowActivityInfo = new LocalFlowActivityInfo(
+                    getStartupTime(), thresholdMillis, copyAttributes);
+            wasActive.set(true);
         }
-        return false;
     }
 
-    private boolean shouldReportOnlyOnPrimary(boolean isClusterScope, final 
ProcessContext context) {
-        if 
(REPORT_NODE_PRIMARY.equals(context.getProperty(REPORTING_NODE).getValue())) {
-            if (isClusterScope) {
-                return true;
+    @OnStopped
+    public void onStopped(final ProcessContext context) {
+        if (getNodeTypeProvider().isConfiguredForClustering()) {
+            // Shared state needs to be cleared, in order to avoid getting 
inactive markers right after starting the
+            // flow after a weekend stop. In single-node setup, there is no 
shared state to be cleared, but the line
+            // below would also wipe out the local state. Hence, the check.
+            final StateManager stateManager = context.getStateManager();
+            try {
+                stateManager.clear(Scope.CLUSTER);
+            } catch (IOException e) {
+                getLogger().error("Failed to clear cluster state due to " + e, 
e);
             }
         }
-        return false;
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
-        final long thresholdMillis = 
context.getProperty(THRESHOLD).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
-        final long now = System.currentTimeMillis();
-
         final ComponentLog logger = getLogger();
-        final boolean copyAttributes = 
context.getProperty(COPY_ATTRIBUTES).asBoolean();
-        final boolean waitForActivity = 
context.getProperty(WAIT_FOR_ACTIVITY).asBoolean();
+
         final boolean isClusterScope = isClusterScope(context, false);
         final boolean isConnectedToCluster = context.isConnectedToCluster();
-        final boolean shouldReportOnlyOnPrimary = 
shouldReportOnlyOnPrimary(isClusterScope, context);
+        final boolean wasActive = this.wasActive.get();
+
         final List<FlowFile> flowFiles = session.get(50);
 
-        if (isClusterScope(context, true)) {
-            if (isReconnectedToCluster(isConnectedToCluster)) {
-                reconcileState(context);
-                connectedWhenLastTriggered.set(true);
-            } else if (!isConnectedToCluster) {
-                connectedWhenLastTriggered.set(false);
+        if (!flowFiles.isEmpty()) {
+            final boolean firstKnownTransfer = 
!localFlowActivityInfo.hasSuccessfulTransfer();
+            final boolean flowStateMustBecomeActive = !wasActive || 
firstKnownTransfer;
+
+            localFlowActivityInfo.update(flowFiles.get(0));
+
+            if (isClusterScope && flowStateMustBecomeActive) {
+                localFlowActivityInfo.forceSync();
             }
         }
 
-        boolean isInactive = false;
-        long updatedLatestSuccessTransfer = -1;
-        StateMap clusterState = null;
-
-        if (flowFiles.isEmpty()) {
-            final long previousSuccessMillis = latestSuccessTransfer.get();
-
-            boolean sendInactiveMarker = false;
-
-            isInactive = (now >= previousSuccessMillis + thresholdMillis);
-            logger.debug("isInactive={}, previousSuccessMillis={}, now={}", 
new Object[]{isInactive, previousSuccessMillis, now});
-            if (isInactive && isClusterScope && isConnectedToCluster) {
-                // Even if this node has been inactive, there may be other 
nodes handling flow actively.
-                // However, if this node is active, we don't have to look at 
cluster state.
-                try {
-                    clusterState = session.getState(Scope.CLUSTER);
-                    if (clusterState != null && 
!StringUtils.isEmpty(clusterState.get(STATE_KEY_LATEST_SUCCESS_TRANSFER))) {
-                        final long latestReportedClusterActivity = 
Long.valueOf(clusterState.get(STATE_KEY_LATEST_SUCCESS_TRANSFER));
-                        isInactive = (now >= latestReportedClusterActivity + 
thresholdMillis);
-                        if (!isInactive) {
-                            // This node has been inactive, but other node has 
more recent activity.
-                            updatedLatestSuccessTransfer = 
latestReportedClusterActivity;
-                        }
-                        logger.debug("isInactive={}, 
latestReportedClusterActivity={}", new Object[]{isInactive, 
latestReportedClusterActivity});
-                    }
-                } catch (IOException e) {
-                    logger.error("Failed to access cluster state. Activity 
will not be monitored properly until this is addressed.", e);
-                }
+        if (isClusterScope) {
+            if (wasActive && !localFlowActivityInfo.isActive()) {
+                localFlowActivityInfo.forceSync();
             }
+            synchronizeState(context);
+        }
+
+        final long thresholdMillis = 
context.getProperty(THRESHOLD).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+        final boolean continuallySendMessages = 
context.getProperty(CONTINUALLY_SEND_MESSAGES).asBoolean();
+        final boolean waitForActivity = 
context.getProperty(WAIT_FOR_ACTIVITY).asBoolean();
 
-            if (isInactive) {
-                final boolean continual = 
context.getProperty(CONTINUALLY_SEND_MESSAGES).asBoolean();
-                sendInactiveMarker = !inactive.getAndSet(true) || (continual 
&& (now > lastInactiveMessage.get() + thresholdMillis));
-                if (waitForActivity) {
-                    sendInactiveMarker = sendInactiveMarker && 
hasSuccessTransfer.get();
-                }
+        final boolean isActive = localFlowActivityInfo.isActive() || 
!flowFiles.isEmpty();
+        final long lastActivity = localFlowActivityInfo.getLastActivity();
+        final long inactivityStartMillis = this.inactivityStartMillis.get();
+        final boolean timeToRepeatInactiveMessage = (lastInactiveMessage.get() 
+ thresholdMillis) <= System.currentTimeMillis();
+
+        final boolean canBecomeInactive = (!isClusterScope || 
isConnectedToCluster)
+                && (!waitForActivity || 
localFlowActivityInfo.hasSuccessfulTransfer());
+
+        if (isActive) {
+            onTriggerActiveFlow(context, session, wasActive, isClusterScope, 
inactivityStartMillis, flowFiles);
+        } else if (canBecomeInactive && (wasActive || (continuallySendMessages 
&& timeToRepeatInactiveMessage))) {
+            onTriggerInactiveFlow(context, session, isClusterScope, 
lastActivity);
+        } else {
+            context.yield();    // no need to dominate CPU checking times; let 
other processors run for a bit.
+        }
+
+        if (wasActive && !canBecomeInactive) {
+            // We need to block ACTIVE -> INACTIVE state transition, because 
we are not connected to the cluster.
+            // When we reconnect, and the INACTIVE state persists, then the 
next onTrigger will do the transition.
+            logger.trace("ACTIVE->INACTIVE transition is blocked, because we 
are not connected to the cluster.");
+        } else {
+            this.wasActive.set(isActive);
+            this.inactivityStartMillis.set(lastActivity);
+        }
+    }
+
+    protected long getStartupTime() {
+        return System.currentTimeMillis();
+    }
+
+    protected final long getLatestSuccessTransfer() {
+        return localFlowActivityInfo.getLastSuccessfulTransfer();
+    }
+
+    private String tryLoadLastSuccessfulTransfer(ProcessContext context) {
+        final StateManager stateManager = context.getStateManager();
+        try {
+            final StateMap localStateMap = stateManager.getState(Scope.LOCAL);
+            return localStateMap.get(STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO);
+        } catch (IOException e) {
+            throw new ProcessException("Failed to load local state due to " + 
e, e);
+        }
+    }
+
+    private void synchronizeState(ProcessContext context) {
+        final ComponentLog logger = getLogger();
+        final boolean isConnectedToCluster = context.isConnectedToCluster();
+
+        if (isReconnectedToCluster(isConnectedToCluster)) {
+            localFlowActivityInfo.forceSync();
+            connectedWhenLastTriggered.set(true);
+        }
+        if (!isConnectedToCluster) {
+            connectedWhenLastTriggered.set(false);
+        } else if (localFlowActivityInfo.syncNeeded()) {
+            final CommonFlowActivityInfo commonFlowActivityInfo = new 
CommonFlowActivityInfo(context);
+            localFlowActivityInfo.update(commonFlowActivityInfo);
+
+            try {
+                commonFlowActivityInfo.update(localFlowActivityInfo);
+                localFlowActivityInfo.setNextSyncMillis();
+            } catch (final SaveSharedFlowStateException ex) {
+                logger.debug("Failed to update common state.", ex);
             }
+        }
+    }
 
-            if (sendInactiveMarker && shouldThisNodeReport(isClusterScope, 
shouldReportOnlyOnPrimary, context)) {
-                lastInactiveMessage.set(System.currentTimeMillis());
+    private void onTriggerInactiveFlow(ProcessContext context, ProcessSession 
session, boolean isClusterScope, long lastActivity) {
+        final ComponentLog logger = getLogger();
+        final boolean shouldReportOnlyOnPrimary = 
shouldReportOnlyOnPrimary(isClusterScope, context);
+        final boolean shouldThisNodeReport = 
shouldThisNodeReport(isClusterScope, shouldReportOnlyOnPrimary, context);

Review Comment:
   `shouldThisNodeReport()` could call `shouldReportOnlyOnPrimary()` internally 
because all the parameters are passed to that method too and 
`shouldReportOnlyOnPrimary` local variable is not used anywhere else.
   
   Also in `onTriggerActiveFlow()`.



##########
nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java:
##########
@@ -217,224 +230,221 @@ protected List<PropertyDescriptor> 
getSupportedPropertyDescriptors() {
     public void onScheduled(final ProcessContext context) {
         // Check configuration.
         isClusterScope(context, true);
-        resetLastSuccessfulTransfer();
-        inactive.set(false);
-        hasSuccessTransfer.set(false);
-    }
-
-
-    protected void resetLastSuccessfulTransfer() {
-        setLastSuccessfulTransfer(System.currentTimeMillis());
-    }
 
-    protected final void setLastSuccessfulTransfer(final long timestamp) {
-        latestSuccessTransfer.set(timestamp);
-        latestReportedNodeState.set(timestamp);
-    }
-
-    protected final long getLatestSuccessTransfer() {
-        return latestSuccessTransfer.get();
-    }
-
-    private boolean isClusterScope(final ProcessContext context, boolean 
logInvalidConfig) {
-        if 
(SCOPE_CLUSTER.equals(context.getProperty(MONITORING_SCOPE).getValue())) {
-            if (getNodeTypeProvider().isConfiguredForClustering()) {
-                return true;
-            }
-            if (logInvalidConfig) {
-                getLogger().warn("NiFi is running as a Standalone mode, but 
'cluster' scope is set." +
-                        " Fallback to 'node' scope. Fix configuration to stop 
this message.");
-            }
+        final long thresholdMillis = 
context.getProperty(THRESHOLD).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+        final boolean copyAttributes = 
context.getProperty(COPY_ATTRIBUTES).asBoolean();
+        final boolean resetStateOnRestart = 
context.getProperty(RESET_STATE_ON_RESTART).asBoolean();
+
+        // Attempt to load last state by the time of stopping this processor. 
A local state only exists if
+        // the monitored flow was already inactive, when the processor was 
shutting down.
+        final String storedLastSuccessfulTransfer = resetStateOnRestart ? null 
: tryLoadLastSuccessfulTransfer(context);
+
+        if (storedLastSuccessfulTransfer != null) {
+            // Initialize local flow as being inactive since the stored 
timestamp.
+            localFlowActivityInfo = new LocalFlowActivityInfo(
+                    getStartupTime(), thresholdMillis, copyAttributes, 
Long.parseLong(storedLastSuccessfulTransfer));
+            wasActive.set(localFlowActivityInfo.isActive());
+            inactivityStartMillis.set(localFlowActivityInfo.getLastActivity());
+        } else {
+            // Initialize local flow as being active. If there is no traffic, 
then it will eventually become inactive.
+            localFlowActivityInfo = new LocalFlowActivityInfo(
+                    getStartupTime(), thresholdMillis, copyAttributes);
+            wasActive.set(true);
         }
-        return false;
     }
 
-    private boolean shouldReportOnlyOnPrimary(boolean isClusterScope, final 
ProcessContext context) {
-        if 
(REPORT_NODE_PRIMARY.equals(context.getProperty(REPORTING_NODE).getValue())) {
-            if (isClusterScope) {
-                return true;
+    @OnStopped
+    public void onStopped(final ProcessContext context) {
+        if (getNodeTypeProvider().isConfiguredForClustering()) {
+            // Shared state needs to be cleared, in order to avoid getting 
inactive markers right after starting the
+            // flow after a weekend stop. In single-node setup, there is no 
shared state to be cleared, but the line
+            // below would also wipe out the local state. Hence, the check.
+            final StateManager stateManager = context.getStateManager();
+            try {
+                stateManager.clear(Scope.CLUSTER);
+            } catch (IOException e) {
+                getLogger().error("Failed to clear cluster state due to " + e, 
e);
             }
         }
-        return false;
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
-        final long thresholdMillis = 
context.getProperty(THRESHOLD).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
-        final long now = System.currentTimeMillis();
-
         final ComponentLog logger = getLogger();
-        final boolean copyAttributes = 
context.getProperty(COPY_ATTRIBUTES).asBoolean();
-        final boolean waitForActivity = 
context.getProperty(WAIT_FOR_ACTIVITY).asBoolean();
+
         final boolean isClusterScope = isClusterScope(context, false);
         final boolean isConnectedToCluster = context.isConnectedToCluster();
-        final boolean shouldReportOnlyOnPrimary = 
shouldReportOnlyOnPrimary(isClusterScope, context);
+        final boolean wasActive = this.wasActive.get();
+
         final List<FlowFile> flowFiles = session.get(50);
 
-        if (isClusterScope(context, true)) {
-            if (isReconnectedToCluster(isConnectedToCluster)) {
-                reconcileState(context);
-                connectedWhenLastTriggered.set(true);
-            } else if (!isConnectedToCluster) {
-                connectedWhenLastTriggered.set(false);
+        if (!flowFiles.isEmpty()) {
+            final boolean firstKnownTransfer = 
!localFlowActivityInfo.hasSuccessfulTransfer();
+            final boolean flowStateMustBecomeActive = !wasActive || 
firstKnownTransfer;
+
+            localFlowActivityInfo.update(flowFiles.get(0));
+
+            if (isClusterScope && flowStateMustBecomeActive) {
+                localFlowActivityInfo.forceSync();
             }
         }
 
-        boolean isInactive = false;
-        long updatedLatestSuccessTransfer = -1;
-        StateMap clusterState = null;
-
-        if (flowFiles.isEmpty()) {
-            final long previousSuccessMillis = latestSuccessTransfer.get();
-
-            boolean sendInactiveMarker = false;
-
-            isInactive = (now >= previousSuccessMillis + thresholdMillis);
-            logger.debug("isInactive={}, previousSuccessMillis={}, now={}", 
new Object[]{isInactive, previousSuccessMillis, now});
-            if (isInactive && isClusterScope && isConnectedToCluster) {
-                // Even if this node has been inactive, there may be other 
nodes handling flow actively.
-                // However, if this node is active, we don't have to look at 
cluster state.
-                try {
-                    clusterState = session.getState(Scope.CLUSTER);
-                    if (clusterState != null && 
!StringUtils.isEmpty(clusterState.get(STATE_KEY_LATEST_SUCCESS_TRANSFER))) {
-                        final long latestReportedClusterActivity = 
Long.valueOf(clusterState.get(STATE_KEY_LATEST_SUCCESS_TRANSFER));
-                        isInactive = (now >= latestReportedClusterActivity + 
thresholdMillis);
-                        if (!isInactive) {
-                            // This node has been inactive, but other node has 
more recent activity.
-                            updatedLatestSuccessTransfer = 
latestReportedClusterActivity;
-                        }
-                        logger.debug("isInactive={}, 
latestReportedClusterActivity={}", new Object[]{isInactive, 
latestReportedClusterActivity});
-                    }
-                } catch (IOException e) {
-                    logger.error("Failed to access cluster state. Activity 
will not be monitored properly until this is addressed.", e);
-                }
+        if (isClusterScope) {
+            if (wasActive && !localFlowActivityInfo.isActive()) {
+                localFlowActivityInfo.forceSync();
             }
+            synchronizeState(context);
+        }
+
+        final long thresholdMillis = 
context.getProperty(THRESHOLD).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+        final boolean continuallySendMessages = 
context.getProperty(CONTINUALLY_SEND_MESSAGES).asBoolean();
+        final boolean waitForActivity = 
context.getProperty(WAIT_FOR_ACTIVITY).asBoolean();
 
-            if (isInactive) {
-                final boolean continual = 
context.getProperty(CONTINUALLY_SEND_MESSAGES).asBoolean();
-                sendInactiveMarker = !inactive.getAndSet(true) || (continual 
&& (now > lastInactiveMessage.get() + thresholdMillis));
-                if (waitForActivity) {
-                    sendInactiveMarker = sendInactiveMarker && 
hasSuccessTransfer.get();
-                }
+        final boolean isActive = localFlowActivityInfo.isActive() || 
!flowFiles.isEmpty();
+        final long lastActivity = localFlowActivityInfo.getLastActivity();
+        final long inactivityStartMillis = this.inactivityStartMillis.get();
+        final boolean timeToRepeatInactiveMessage = (lastInactiveMessage.get() 
+ thresholdMillis) <= System.currentTimeMillis();
+
+        final boolean canBecomeInactive = (!isClusterScope || 
isConnectedToCluster)
+                && (!waitForActivity || 
localFlowActivityInfo.hasSuccessfulTransfer());
+
+        if (isActive) {
+            onTriggerActiveFlow(context, session, wasActive, isClusterScope, 
inactivityStartMillis, flowFiles);
+        } else if (canBecomeInactive && (wasActive || (continuallySendMessages 
&& timeToRepeatInactiveMessage))) {
+            onTriggerInactiveFlow(context, session, isClusterScope, 
lastActivity);
+        } else {
+            context.yield();    // no need to dominate CPU checking times; let 
other processors run for a bit.
+        }
+
+        if (wasActive && !canBecomeInactive) {
+            // We need to block ACTIVE -> INACTIVE state transition, because 
we are not connected to the cluster.
+            // When we reconnect, and the INACTIVE state persists, then the 
next onTrigger will do the transition.
+            logger.trace("ACTIVE->INACTIVE transition is blocked, because we 
are not connected to the cluster.");
+        } else {
+            this.wasActive.set(isActive);
+            this.inactivityStartMillis.set(lastActivity);
+        }
+    }
+
+    protected long getStartupTime() {
+        return System.currentTimeMillis();
+    }
+
+    protected final long getLatestSuccessTransfer() {
+        return localFlowActivityInfo.getLastSuccessfulTransfer();
+    }

Review Comment:
   ```suggestion
       protected final long getLastSuccessfulTransfer() {
           return localFlowActivityInfo.getLastSuccessfulTransfer();
       }
   ```



##########
nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java:
##########
@@ -72,10 +70,18 @@
 @WritesAttributes({
     @WritesAttribute(attribute = "inactivityStartMillis", description = "The 
time at which Inactivity began, in the form of milliseconds since Epoch"),
     @WritesAttribute(attribute = "inactivityDurationMillis", description = 
"The number of milliseconds that the inactivity has spanned")})
-@Stateful(scopes = Scope.CLUSTER, description = "MonitorActivity stores the 
last timestamp at each node as state, so that it can examine activity at 
cluster wide." +
-        "If 'Copy Attribute' is set to true, then flow file attributes are 
also persisted.")
+@Stateful(
+        scopes = { Scope.CLUSTER, Scope.LOCAL },
+        description = "MonitorActivity stores the last timestamp at each node 
as state, "
+                + "so that it can examine activity at cluster wide. "
+                + "If 'Copy Attribute' is set to true, then flow file 
attributes are also persisted. "
+                + "In local scope, it stores last known activity timestamp if 
the flow is inactive."
+)
 public class MonitorActivity extends AbstractProcessor {
 
+    public static final String STATE_KEY_LATEST_SUCCESS_TRANSFER = 
"MonitorActivity.latestSuccessTransfer";
+    public static final String STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO = 
"LocalFlowActivityInfo.lastSuccessfulTransfer";

Review Comment:
   `lastSuccessfulTransfer` is used in every other places. For sake of 
consistency, I would suggest renaming it too:
   ```suggestion
       public static final String STATE_KEY_COMMON_FLOW_ACTIVITY_INFO = 
"CommonFlowActivityInfo.lastSuccessfulTransfer";
       public static final String STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO = 
"LocalFlowActivityInfo.lastSuccessfulTransfer";
   ```



##########
nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java:
##########
@@ -217,224 +230,221 @@ protected List<PropertyDescriptor> 
getSupportedPropertyDescriptors() {
     public void onScheduled(final ProcessContext context) {
         // Check configuration.
         isClusterScope(context, true);
-        resetLastSuccessfulTransfer();
-        inactive.set(false);
-        hasSuccessTransfer.set(false);
-    }
-
-
-    protected void resetLastSuccessfulTransfer() {
-        setLastSuccessfulTransfer(System.currentTimeMillis());
-    }
 
-    protected final void setLastSuccessfulTransfer(final long timestamp) {
-        latestSuccessTransfer.set(timestamp);
-        latestReportedNodeState.set(timestamp);
-    }
-
-    protected final long getLatestSuccessTransfer() {
-        return latestSuccessTransfer.get();
-    }
-
-    private boolean isClusterScope(final ProcessContext context, boolean 
logInvalidConfig) {
-        if 
(SCOPE_CLUSTER.equals(context.getProperty(MONITORING_SCOPE).getValue())) {
-            if (getNodeTypeProvider().isConfiguredForClustering()) {
-                return true;
-            }
-            if (logInvalidConfig) {
-                getLogger().warn("NiFi is running as a Standalone mode, but 
'cluster' scope is set." +
-                        " Fallback to 'node' scope. Fix configuration to stop 
this message.");
-            }
+        final long thresholdMillis = 
context.getProperty(THRESHOLD).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+        final boolean copyAttributes = 
context.getProperty(COPY_ATTRIBUTES).asBoolean();
+        final boolean resetStateOnRestart = 
context.getProperty(RESET_STATE_ON_RESTART).asBoolean();
+
+        // Attempt to load last state by the time of stopping this processor. 
A local state only exists if
+        // the monitored flow was already inactive, when the processor was 
shutting down.
+        final String storedLastSuccessfulTransfer = resetStateOnRestart ? null 
: tryLoadLastSuccessfulTransfer(context);
+
+        if (storedLastSuccessfulTransfer != null) {
+            // Initialize local flow as being inactive since the stored 
timestamp.
+            localFlowActivityInfo = new LocalFlowActivityInfo(
+                    getStartupTime(), thresholdMillis, copyAttributes, 
Long.parseLong(storedLastSuccessfulTransfer));
+            wasActive.set(localFlowActivityInfo.isActive());
+            inactivityStartMillis.set(localFlowActivityInfo.getLastActivity());
+        } else {
+            // Initialize local flow as being active. If there is no traffic, 
then it will eventually become inactive.
+            localFlowActivityInfo = new LocalFlowActivityInfo(
+                    getStartupTime(), thresholdMillis, copyAttributes);
+            wasActive.set(true);
         }
-        return false;
     }
 
-    private boolean shouldReportOnlyOnPrimary(boolean isClusterScope, final 
ProcessContext context) {
-        if 
(REPORT_NODE_PRIMARY.equals(context.getProperty(REPORTING_NODE).getValue())) {
-            if (isClusterScope) {
-                return true;
+    @OnStopped
+    public void onStopped(final ProcessContext context) {
+        if (getNodeTypeProvider().isConfiguredForClustering()) {
+            // Shared state needs to be cleared, in order to avoid getting 
inactive markers right after starting the
+            // flow after a weekend stop. In single-node setup, there is no 
shared state to be cleared, but the line
+            // below would also wipe out the local state. Hence, the check.
+            final StateManager stateManager = context.getStateManager();
+            try {
+                stateManager.clear(Scope.CLUSTER);

Review Comment:
   `getNodeTypeProvider().isConfiguredForClustering()` returns `true` for a 
disconnected node and it would clear its local state on restart (cluster scope 
falls back to local scope in case of disconnected nodes).
   As only the shared state needs to cleared but not the local one, I suggest 
adding `context.isConnectedToCluster()` to the condition.



##########
nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java:
##########
@@ -217,224 +230,221 @@ protected List<PropertyDescriptor> 
getSupportedPropertyDescriptors() {
     public void onScheduled(final ProcessContext context) {
         // Check configuration.
         isClusterScope(context, true);
-        resetLastSuccessfulTransfer();
-        inactive.set(false);
-        hasSuccessTransfer.set(false);
-    }
-
-
-    protected void resetLastSuccessfulTransfer() {
-        setLastSuccessfulTransfer(System.currentTimeMillis());
-    }
 
-    protected final void setLastSuccessfulTransfer(final long timestamp) {
-        latestSuccessTransfer.set(timestamp);
-        latestReportedNodeState.set(timestamp);
-    }
-
-    protected final long getLatestSuccessTransfer() {
-        return latestSuccessTransfer.get();
-    }
-
-    private boolean isClusterScope(final ProcessContext context, boolean 
logInvalidConfig) {
-        if 
(SCOPE_CLUSTER.equals(context.getProperty(MONITORING_SCOPE).getValue())) {
-            if (getNodeTypeProvider().isConfiguredForClustering()) {
-                return true;
-            }
-            if (logInvalidConfig) {
-                getLogger().warn("NiFi is running as a Standalone mode, but 
'cluster' scope is set." +
-                        " Fallback to 'node' scope. Fix configuration to stop 
this message.");
-            }
+        final long thresholdMillis = 
context.getProperty(THRESHOLD).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+        final boolean copyAttributes = 
context.getProperty(COPY_ATTRIBUTES).asBoolean();
+        final boolean resetStateOnRestart = 
context.getProperty(RESET_STATE_ON_RESTART).asBoolean();
+
+        // Attempt to load last state by the time of stopping this processor. 
A local state only exists if
+        // the monitored flow was already inactive, when the processor was 
shutting down.
+        final String storedLastSuccessfulTransfer = resetStateOnRestart ? null 
: tryLoadLastSuccessfulTransfer(context);
+
+        if (storedLastSuccessfulTransfer != null) {
+            // Initialize local flow as being inactive since the stored 
timestamp.
+            localFlowActivityInfo = new LocalFlowActivityInfo(
+                    getStartupTime(), thresholdMillis, copyAttributes, 
Long.parseLong(storedLastSuccessfulTransfer));
+            wasActive.set(localFlowActivityInfo.isActive());
+            inactivityStartMillis.set(localFlowActivityInfo.getLastActivity());
+        } else {
+            // Initialize local flow as being active. If there is no traffic, 
then it will eventually become inactive.
+            localFlowActivityInfo = new LocalFlowActivityInfo(
+                    getStartupTime(), thresholdMillis, copyAttributes);
+            wasActive.set(true);
         }
-        return false;
     }
 
-    private boolean shouldReportOnlyOnPrimary(boolean isClusterScope, final 
ProcessContext context) {
-        if 
(REPORT_NODE_PRIMARY.equals(context.getProperty(REPORTING_NODE).getValue())) {
-            if (isClusterScope) {
-                return true;
+    @OnStopped
+    public void onStopped(final ProcessContext context) {
+        if (getNodeTypeProvider().isConfiguredForClustering()) {
+            // Shared state needs to be cleared, in order to avoid getting 
inactive markers right after starting the
+            // flow after a weekend stop. In single-node setup, there is no 
shared state to be cleared, but the line
+            // below would also wipe out the local state. Hence, the check.
+            final StateManager stateManager = context.getStateManager();
+            try {
+                stateManager.clear(Scope.CLUSTER);
+            } catch (IOException e) {
+                getLogger().error("Failed to clear cluster state due to " + e, 
e);

Review Comment:
   It is enough to pass the exception once and its message and stack trace will 
be logged. Please remove the other "due to" clauses too.
   ```suggestion
                   getLogger().error("Failed to clear cluster state", e);
   ```



##########
nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java:
##########
@@ -170,18 +184,16 @@ public class MonitorActivity extends AbstractProcessor {
             .description("This relationship is used to transfer an Activity 
Restored indicator when FlowFiles are routing to 'success' following a "
                     + "period of inactivity")
             .build();
-    public static final Charset UTF8 = Charset.forName("UTF-8");
 
     private List<PropertyDescriptor> properties;
     private Set<Relationship> relationships;
 
-    private final AtomicLong latestSuccessTransfer = new 
AtomicLong(System.currentTimeMillis());
-    private final AtomicLong latestReportedNodeState = new 
AtomicLong(System.currentTimeMillis());
-    private final AtomicBoolean inactive = new AtomicBoolean(false);
-    private final AtomicBoolean hasSuccessTransfer = new AtomicBoolean(false);
     private final AtomicBoolean connectedWhenLastTriggered = new 
AtomicBoolean(false);
-    private final AtomicLong lastInactiveMessage = new 
AtomicLong(System.currentTimeMillis());
-    public static final String STATE_KEY_LATEST_SUCCESS_TRANSFER = 
"MonitorActivity.latestSuccessTransfer";
+    private final AtomicLong lastInactiveMessage = new AtomicLong();
+    private final AtomicLong inactivityStartMillis = new 
AtomicLong(System.currentTimeMillis());
+    private final AtomicBoolean wasActive = new AtomicBoolean(true);
+
+    private LocalFlowActivityInfo localFlowActivityInfo;

Review Comment:
   The field should be `volatile` because it is accessed by different threads 
(written by `onScheduled()`, read by `onTrigger()`)
   ```suggestion
       private volatile LocalFlowActivityInfo localFlowActivityInfo;
   ```



##########
nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java:
##########
@@ -217,224 +230,221 @@ protected List<PropertyDescriptor> 
getSupportedPropertyDescriptors() {
     public void onScheduled(final ProcessContext context) {
         // Check configuration.
         isClusterScope(context, true);
-        resetLastSuccessfulTransfer();
-        inactive.set(false);
-        hasSuccessTransfer.set(false);
-    }
-
-
-    protected void resetLastSuccessfulTransfer() {
-        setLastSuccessfulTransfer(System.currentTimeMillis());
-    }
 
-    protected final void setLastSuccessfulTransfer(final long timestamp) {
-        latestSuccessTransfer.set(timestamp);
-        latestReportedNodeState.set(timestamp);
-    }
-
-    protected final long getLatestSuccessTransfer() {
-        return latestSuccessTransfer.get();
-    }
-
-    private boolean isClusterScope(final ProcessContext context, boolean 
logInvalidConfig) {
-        if 
(SCOPE_CLUSTER.equals(context.getProperty(MONITORING_SCOPE).getValue())) {
-            if (getNodeTypeProvider().isConfiguredForClustering()) {
-                return true;
-            }
-            if (logInvalidConfig) {
-                getLogger().warn("NiFi is running as a Standalone mode, but 
'cluster' scope is set." +
-                        " Fallback to 'node' scope. Fix configuration to stop 
this message.");
-            }
+        final long thresholdMillis = 
context.getProperty(THRESHOLD).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+        final boolean copyAttributes = 
context.getProperty(COPY_ATTRIBUTES).asBoolean();
+        final boolean resetStateOnRestart = 
context.getProperty(RESET_STATE_ON_RESTART).asBoolean();
+
+        // Attempt to load last state by the time of stopping this processor. 
A local state only exists if
+        // the monitored flow was already inactive, when the processor was 
shutting down.
+        final String storedLastSuccessfulTransfer = resetStateOnRestart ? null 
: tryLoadLastSuccessfulTransfer(context);
+
+        if (storedLastSuccessfulTransfer != null) {
+            // Initialize local flow as being inactive since the stored 
timestamp.
+            localFlowActivityInfo = new LocalFlowActivityInfo(
+                    getStartupTime(), thresholdMillis, copyAttributes, 
Long.parseLong(storedLastSuccessfulTransfer));
+            wasActive.set(localFlowActivityInfo.isActive());
+            inactivityStartMillis.set(localFlowActivityInfo.getLastActivity());
+        } else {
+            // Initialize local flow as being active. If there is no traffic, 
then it will eventually become inactive.
+            localFlowActivityInfo = new LocalFlowActivityInfo(
+                    getStartupTime(), thresholdMillis, copyAttributes);
+            wasActive.set(true);
         }
-        return false;
     }
 
-    private boolean shouldReportOnlyOnPrimary(boolean isClusterScope, final 
ProcessContext context) {
-        if 
(REPORT_NODE_PRIMARY.equals(context.getProperty(REPORTING_NODE).getValue())) {
-            if (isClusterScope) {
-                return true;
+    @OnStopped
+    public void onStopped(final ProcessContext context) {
+        if (getNodeTypeProvider().isConfiguredForClustering()) {
+            // Shared state needs to be cleared, in order to avoid getting 
inactive markers right after starting the
+            // flow after a weekend stop. In single-node setup, there is no 
shared state to be cleared, but the line
+            // below would also wipe out the local state. Hence, the check.
+            final StateManager stateManager = context.getStateManager();
+            try {
+                stateManager.clear(Scope.CLUSTER);
+            } catch (IOException e) {
+                getLogger().error("Failed to clear cluster state due to " + e, 
e);
             }
         }
-        return false;
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
-        final long thresholdMillis = 
context.getProperty(THRESHOLD).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
-        final long now = System.currentTimeMillis();
-
         final ComponentLog logger = getLogger();
-        final boolean copyAttributes = 
context.getProperty(COPY_ATTRIBUTES).asBoolean();
-        final boolean waitForActivity = 
context.getProperty(WAIT_FOR_ACTIVITY).asBoolean();
+
         final boolean isClusterScope = isClusterScope(context, false);
         final boolean isConnectedToCluster = context.isConnectedToCluster();
-        final boolean shouldReportOnlyOnPrimary = 
shouldReportOnlyOnPrimary(isClusterScope, context);
+        final boolean wasActive = this.wasActive.get();
+
         final List<FlowFile> flowFiles = session.get(50);
 
-        if (isClusterScope(context, true)) {
-            if (isReconnectedToCluster(isConnectedToCluster)) {
-                reconcileState(context);
-                connectedWhenLastTriggered.set(true);
-            } else if (!isConnectedToCluster) {
-                connectedWhenLastTriggered.set(false);
+        if (!flowFiles.isEmpty()) {
+            final boolean firstKnownTransfer = 
!localFlowActivityInfo.hasSuccessfulTransfer();
+            final boolean flowStateMustBecomeActive = !wasActive || 
firstKnownTransfer;
+
+            localFlowActivityInfo.update(flowFiles.get(0));
+
+            if (isClusterScope && flowStateMustBecomeActive) {
+                localFlowActivityInfo.forceSync();
             }
         }
 
-        boolean isInactive = false;
-        long updatedLatestSuccessTransfer = -1;
-        StateMap clusterState = null;
-
-        if (flowFiles.isEmpty()) {
-            final long previousSuccessMillis = latestSuccessTransfer.get();
-
-            boolean sendInactiveMarker = false;
-
-            isInactive = (now >= previousSuccessMillis + thresholdMillis);
-            logger.debug("isInactive={}, previousSuccessMillis={}, now={}", 
new Object[]{isInactive, previousSuccessMillis, now});
-            if (isInactive && isClusterScope && isConnectedToCluster) {
-                // Even if this node has been inactive, there may be other 
nodes handling flow actively.
-                // However, if this node is active, we don't have to look at 
cluster state.
-                try {
-                    clusterState = session.getState(Scope.CLUSTER);
-                    if (clusterState != null && 
!StringUtils.isEmpty(clusterState.get(STATE_KEY_LATEST_SUCCESS_TRANSFER))) {
-                        final long latestReportedClusterActivity = 
Long.valueOf(clusterState.get(STATE_KEY_LATEST_SUCCESS_TRANSFER));
-                        isInactive = (now >= latestReportedClusterActivity + 
thresholdMillis);
-                        if (!isInactive) {
-                            // This node has been inactive, but other node has 
more recent activity.
-                            updatedLatestSuccessTransfer = 
latestReportedClusterActivity;
-                        }
-                        logger.debug("isInactive={}, 
latestReportedClusterActivity={}", new Object[]{isInactive, 
latestReportedClusterActivity});
-                    }
-                } catch (IOException e) {
-                    logger.error("Failed to access cluster state. Activity 
will not be monitored properly until this is addressed.", e);
-                }
+        if (isClusterScope) {
+            if (wasActive && !localFlowActivityInfo.isActive()) {
+                localFlowActivityInfo.forceSync();
             }
+            synchronizeState(context);
+        }
+
+        final long thresholdMillis = 
context.getProperty(THRESHOLD).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+        final boolean continuallySendMessages = 
context.getProperty(CONTINUALLY_SEND_MESSAGES).asBoolean();
+        final boolean waitForActivity = 
context.getProperty(WAIT_FOR_ACTIVITY).asBoolean();
 
-            if (isInactive) {
-                final boolean continual = 
context.getProperty(CONTINUALLY_SEND_MESSAGES).asBoolean();
-                sendInactiveMarker = !inactive.getAndSet(true) || (continual 
&& (now > lastInactiveMessage.get() + thresholdMillis));
-                if (waitForActivity) {
-                    sendInactiveMarker = sendInactiveMarker && 
hasSuccessTransfer.get();
-                }
+        final boolean isActive = localFlowActivityInfo.isActive() || 
!flowFiles.isEmpty();
+        final long lastActivity = localFlowActivityInfo.getLastActivity();
+        final long inactivityStartMillis = this.inactivityStartMillis.get();
+        final boolean timeToRepeatInactiveMessage = (lastInactiveMessage.get() 
+ thresholdMillis) <= System.currentTimeMillis();
+
+        final boolean canBecomeInactive = (!isClusterScope || 
isConnectedToCluster)
+                && (!waitForActivity || 
localFlowActivityInfo.hasSuccessfulTransfer());
+
+        if (isActive) {
+            onTriggerActiveFlow(context, session, wasActive, isClusterScope, 
inactivityStartMillis, flowFiles);
+        } else if (canBecomeInactive && (wasActive || (continuallySendMessages 
&& timeToRepeatInactiveMessage))) {
+            onTriggerInactiveFlow(context, session, isClusterScope, 
lastActivity);
+        } else {
+            context.yield();    // no need to dominate CPU checking times; let 
other processors run for a bit.
+        }
+
+        if (wasActive && !canBecomeInactive) {
+            // We need to block ACTIVE -> INACTIVE state transition, because 
we are not connected to the cluster.
+            // When we reconnect, and the INACTIVE state persists, then the 
next onTrigger will do the transition.
+            logger.trace("ACTIVE->INACTIVE transition is blocked, because we 
are not connected to the cluster.");
+        } else {
+            this.wasActive.set(isActive);
+            this.inactivityStartMillis.set(lastActivity);
+        }
+    }
+
+    protected long getStartupTime() {
+        return System.currentTimeMillis();
+    }
+
+    protected final long getLatestSuccessTransfer() {
+        return localFlowActivityInfo.getLastSuccessfulTransfer();
+    }
+
+    private String tryLoadLastSuccessfulTransfer(ProcessContext context) {
+        final StateManager stateManager = context.getStateManager();
+        try {
+            final StateMap localStateMap = stateManager.getState(Scope.LOCAL);
+            return localStateMap.get(STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO);
+        } catch (IOException e) {
+            throw new ProcessException("Failed to load local state due to " + 
e, e);
+        }
+    }
+
+    private void synchronizeState(ProcessContext context) {
+        final ComponentLog logger = getLogger();
+        final boolean isConnectedToCluster = context.isConnectedToCluster();
+
+        if (isReconnectedToCluster(isConnectedToCluster)) {
+            localFlowActivityInfo.forceSync();
+            connectedWhenLastTriggered.set(true);
+        }
+        if (!isConnectedToCluster) {
+            connectedWhenLastTriggered.set(false);
+        } else if (localFlowActivityInfo.syncNeeded()) {
+            final CommonFlowActivityInfo commonFlowActivityInfo = new 
CommonFlowActivityInfo(context);
+            localFlowActivityInfo.update(commonFlowActivityInfo);
+
+            try {
+                commonFlowActivityInfo.update(localFlowActivityInfo);
+                localFlowActivityInfo.setNextSyncMillis();
+            } catch (final SaveSharedFlowStateException ex) {
+                logger.debug("Failed to update common state.", ex);
             }
+        }
+    }
 
-            if (sendInactiveMarker && shouldThisNodeReport(isClusterScope, 
shouldReportOnlyOnPrimary, context)) {
-                lastInactiveMessage.set(System.currentTimeMillis());
+    private void onTriggerInactiveFlow(ProcessContext context, ProcessSession 
session, boolean isClusterScope, long lastActivity) {
+        final ComponentLog logger = getLogger();
+        final boolean shouldReportOnlyOnPrimary = 
shouldReportOnlyOnPrimary(isClusterScope, context);
+        final boolean shouldThisNodeReport = 
shouldThisNodeReport(isClusterScope, shouldReportOnlyOnPrimary, context);
 
-                FlowFile inactiveFlowFile = session.create();
-                inactiveFlowFile = session.putAttribute(inactiveFlowFile, 
"inactivityStartMillis", String.valueOf(previousSuccessMillis));
-                inactiveFlowFile = session.putAttribute(inactiveFlowFile, 
"inactivityDurationMillis", String.valueOf(now - previousSuccessMillis));
+        if (shouldThisNodeReport) {
+            sendInactivityMarker(context, session, lastActivity, logger);
+        }
+        lastInactiveMessage.set(System.currentTimeMillis());
+        setInactivityFlag(context.getStateManager());
+    }
 
-                final byte[] outBytes = 
context.getProperty(INACTIVITY_MESSAGE).evaluateAttributeExpressions(inactiveFlowFile).getValue().getBytes(UTF8);
-                inactiveFlowFile = session.write(inactiveFlowFile, new 
OutputStreamCallback() {
-                    @Override
-                    public void process(final OutputStream out) throws 
IOException {
-                        out.write(outBytes);
-                    }
-                });
+    private void onTriggerActiveFlow(ProcessContext context, ProcessSession 
session, boolean wasActive, boolean isClusterScope,
+            long inactivityStartMillis, List<FlowFile> flowFiles) {
+        final ComponentLog logger = getLogger();
+        final boolean shouldReportOnlyOnPrimary = 
shouldReportOnlyOnPrimary(isClusterScope, context);
+        final boolean shouldThisNodeReport = 
shouldThisNodeReport(isClusterScope, shouldReportOnlyOnPrimary, context);
 
-                session.getProvenanceReporter().create(inactiveFlowFile);
-                session.transfer(inactiveFlowFile, REL_INACTIVE);
-                logger.info("Transferred {} to 'inactive'", new 
Object[]{inactiveFlowFile});
-            } else {
-                context.yield();    // no need to dominate CPU checking times; 
let other processors run for a bit.
+        if (!wasActive) {
+            if (shouldThisNodeReport) {

Review Comment:
   `shouldThisNodeReport` local variable could be moved into `if (!wasActive) { 
... }`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to