Github user ijokarumawak commented on a diff in the pull request:
https://github.com/apache/nifi/pull/575#discussion_r70437957
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
---
@@ -201,17 +253,57 @@ public void process(final OutputStream out) throws
IOException {
}
} else {
session.transfer(flowFiles, REL_SUCCESS);
+ updatedLatestSuccessTransfer = now;
logger.info("Transferred {} FlowFiles to 'success'", new
Object[]{flowFiles.size()});
- final long inactivityStartMillis =
latestSuccessTransfer.getAndSet(now);
+ final long latestStateReportTimestamp =
latestReportedNodeState.get();
+ if (SCOPE_CLUSTER.equals(monitoringScope)
+ && (now - latestStateReportTimestamp) >
(thresholdMillis / 3)) {
+ // We don't want to hit the state manager every
onTrigger(), but often enough to detect activeness.
+ try {
+ final StateManager stateManager =
context.getStateManager();
+ final StateMap state =
stateManager.getState(Scope.CLUSTER);
+
+ final Map<String, String> newValues = new HashMap<>();
+
+ // Persist attributes so that other nodes can copy it
+ if (copyAttributes) {
+ newValues.putAll(flowFiles.get(0).getAttributes());
+ }
+ newValues.put(STATE_KEY_LATEST_SUCCESS_TRANSFER,
String.valueOf(now));
+
+ if (state == null || state.getVersion() == -1) {
+ stateManager.setState(newValues, Scope.CLUSTER);
+ } else {
+ // If this returns false due to race condition,
it's not a problem since we just need
+ // the latest active timestamp.
+ stateManager.replace(state, newValues,
Scope.CLUSTER);
--- End diff --
Thanks for catching this, I'll add code to check if the current timestamp
is newer than the one stored in Zookeeper.
For retrying, I'd let NiFi scheduler and onTrigger() manage that, instead
of adding a retry mechanism here, for simplicity and controllability. If it's
trying to update state, it means that the node is active, so it will report
correctly. The only problem I can imagine from not updating the state, is the
possibility for other nodes to emit false alarms. But it's also inevitable if
other nodes themselves are inactive and Zk is not working. If the issue is
solved, further onTrigger() call will write state and it will work correctly
eventually.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---