[
https://issues.apache.org/jira/browse/NIFI-619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372877#comment-15372877
]
ASF GitHub Bot commented on NIFI-619:
-------------------------------------
Github user ijokarumawak commented on a diff in the pull request:
https://github.com/apache/nifi/pull/575#discussion_r70437957
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
---
@@ -201,17 +253,57 @@ public void process(final OutputStream out) throws
IOException {
}
} else {
session.transfer(flowFiles, REL_SUCCESS);
+ updatedLatestSuccessTransfer = now;
logger.info("Transferred {} FlowFiles to 'success'", new
Object[]{flowFiles.size()});
- final long inactivityStartMillis =
latestSuccessTransfer.getAndSet(now);
+ final long latestStateReportTimestamp =
latestReportedNodeState.get();
+ if (SCOPE_CLUSTER.equals(monitoringScope)
+ && (now - latestStateReportTimestamp) >
(thresholdMillis / 3)) {
+ // We don't want to hit the state manager every
onTrigger(), but often enough to detect activeness.
+ try {
+ final StateManager stateManager =
context.getStateManager();
+ final StateMap state =
stateManager.getState(Scope.CLUSTER);
+
+ final Map<String, String> newValues = new HashMap<>();
+
+ // Persist attributes so that other nodes can copy it
+ if (copyAttributes) {
+ newValues.putAll(flowFiles.get(0).getAttributes());
+ }
+ newValues.put(STATE_KEY_LATEST_SUCCESS_TRANSFER,
String.valueOf(now));
+
+ if (state == null || state.getVersion() == -1) {
+ stateManager.setState(newValues, Scope.CLUSTER);
+ } else {
+ // If this returns false due to race condition,
it's not a problem since we just need
+ // the latest active timestamp.
+ stateManager.replace(state, newValues,
Scope.CLUSTER);
--- End diff --
Thanks for catching this, I'll add code to check if the current timestamp
is newer than the one stored in Zookeeper.
For retrying, I'd let NiFi scheduler and onTrigger() manage that, instead
of adding a retry mechanism here, for simplicity and controllability. If it's
trying to update state, it means that the node is active, so it will report
correctly. The only problem I can imagine from not updating the state, is the
possibility for other nodes to emit false alarms. But it's also inevitable if
other nodes themselves are inactive and Zk is not working. If the issue is
solved, further onTrigger() call will write state and it will work correctly
eventually.
> update MonitorActivity processor to be cluster friendly
> -------------------------------------------------------
>
> Key: NIFI-619
> URL: https://issues.apache.org/jira/browse/NIFI-619
> Project: Apache NiFi
> Issue Type: Improvement
> Reporter: Brandon DeVries
> Assignee: Koji Kawamura
> Priority: Minor
> Fix For: 1.0.0
>
>
> This processor should be able to be used to monitor activity across the
> cluster. In its current state, alerting is based on activity of a single
> node, not the entire cluster.
> For example, in a 2 node cluster, if system A is getting data from a given
> flow and system B is not, system B will alert for lack of activity even
> though the flow is functioning "normally".
> The ideal behavior would be fore an alert to be generated only if both
> systems did not see data in the specified time.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)