Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/575#discussion_r70437957
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
 ---
    @@ -201,17 +253,57 @@ public void process(final OutputStream out) throws 
IOException {
                 }
             } else {
                 session.transfer(flowFiles, REL_SUCCESS);
    +            updatedLatestSuccessTransfer = now;
                 logger.info("Transferred {} FlowFiles to 'success'", new 
Object[]{flowFiles.size()});
     
    -            final long inactivityStartMillis = 
latestSuccessTransfer.getAndSet(now);
    +            final long latestStateReportTimestamp = 
latestReportedNodeState.get();
    +            if (SCOPE_CLUSTER.equals(monitoringScope)
    +                    && (now - latestStateReportTimestamp) > 
(thresholdMillis / 3)) {
    +                // We don't want to hit the state manager every 
onTrigger(), but often enough to detect activeness.
    +                try {
    +                    final StateManager stateManager = 
context.getStateManager();
    +                    final StateMap state = 
stateManager.getState(Scope.CLUSTER);
    +
    +                    final Map<String, String> newValues = new HashMap<>();
    +
    +                    // Persist attributes so that other nodes can copy it
    +                    if (copyAttributes) {
    +                        newValues.putAll(flowFiles.get(0).getAttributes());
    +                    }
    +                    newValues.put(STATE_KEY_LATEST_SUCCESS_TRANSFER, 
String.valueOf(now));
    +
    +                    if (state == null || state.getVersion() == -1) {
    +                        stateManager.setState(newValues, Scope.CLUSTER);
    +                    } else {
    +                        // If this returns false due to race condition, 
it's not a problem since we just need
    +                        // the latest active timestamp.
    +                        stateManager.replace(state, newValues, 
Scope.CLUSTER);
    --- End diff --
    
    Thanks for catching this, I'll add code to check if the current timestamp 
is newer than the one stored in Zookeeper.
    
    For retrying, I'd let NiFi scheduler and onTrigger() manage that, instead 
of adding a retry mechanism here, for simplicity and controllability. If it's 
trying to update state, it means that the node is active, so it will report 
correctly. The only problem I can imagine from not updating the state, is the 
possibility for other nodes to emit false alarms. But it's also inevitable if 
other nodes themselves are inactive and Zk is not working. If the issue is 
solved, further onTrigger() call will write state and it will work correctly 
eventually.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to