[
https://issues.apache.org/jira/browse/NIFI-619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15370971#comment-15370971
]
ASF GitHub Bot commented on NIFI-619:
-------------------------------------
Github user JPercivall commented on a diff in the pull request:
https://github.com/apache/nifi/pull/575#discussion_r70279855
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
---
@@ -201,17 +253,57 @@ public void process(final OutputStream out) throws
IOException {
}
} else {
session.transfer(flowFiles, REL_SUCCESS);
+ updatedLatestSuccessTransfer = now;
logger.info("Transferred {} FlowFiles to 'success'", new
Object[]{flowFiles.size()});
- final long inactivityStartMillis =
latestSuccessTransfer.getAndSet(now);
+ final long latestStateReportTimestamp =
latestReportedNodeState.get();
+ if (SCOPE_CLUSTER.equals(monitoringScope)
+ && (now - latestStateReportTimestamp) >
(thresholdMillis / 3)) {
+ // We don't want to hit the state manager every
onTrigger(), but often enough to detect activeness.
+ try {
+ final StateManager stateManager =
context.getStateManager();
+ final StateMap state =
stateManager.getState(Scope.CLUSTER);
+
+ final Map<String, String> newValues = new HashMap<>();
+
+ // Persist attributes so that other nodes can copy it
+ if (copyAttributes) {
+ newValues.putAll(flowFiles.get(0).getAttributes());
+ }
+ newValues.put(STATE_KEY_LATEST_SUCCESS_TRANSFER,
String.valueOf(now));
+
+ if (state == null || state.getVersion() == -1) {
+ stateManager.setState(newValues, Scope.CLUSTER);
+ } else {
+ // If this returns false due to race condition,
it's not a problem since we just need
+ // the latest active timestamp.
+ stateManager.replace(state, newValues,
Scope.CLUSTER);
--- End diff --
While it usually is the case, we can't assume that the new state has a
later timestamp. It could be case that one node is much more taxed than another
and it takes longer to transmit state back to the ZooKeeper cluster (where
clustered state is stored). This should continually check/attempt to replace
until it succeeds or the stored state is later than the current timestamp.
> update MonitorActivity processor to be cluster friendly
> -------------------------------------------------------
>
> Key: NIFI-619
> URL: https://issues.apache.org/jira/browse/NIFI-619
> Project: Apache NiFi
> Issue Type: Improvement
> Reporter: Brandon DeVries
> Assignee: Koji Kawamura
> Priority: Minor
> Fix For: 1.0.0
>
>
> This processor should be able to be used to monitor activity across the
> cluster. In its current state, alerting is based on activity of a single
> node, not the entire cluster.
> For example, in a 2 node cluster, if system A is getting data from a given
> flow and system B is not, system B will alert for lack of activity even
> though the flow is functioning "normally".
> The ideal behavior would be fore an alert to be generated only if both
> systems did not see data in the specified time.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)