Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/575#discussion_r70279855
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
 ---
    @@ -201,17 +253,57 @@ public void process(final OutputStream out) throws 
IOException {
                 }
             } else {
                 session.transfer(flowFiles, REL_SUCCESS);
    +            updatedLatestSuccessTransfer = now;
                 logger.info("Transferred {} FlowFiles to 'success'", new 
Object[]{flowFiles.size()});
     
    -            final long inactivityStartMillis = 
latestSuccessTransfer.getAndSet(now);
    +            final long latestStateReportTimestamp = 
latestReportedNodeState.get();
    +            if (SCOPE_CLUSTER.equals(monitoringScope)
    +                    && (now - latestStateReportTimestamp) > 
(thresholdMillis / 3)) {
    +                // We don't want to hit the state manager every 
onTrigger(), but often enough to detect activeness.
    +                try {
    +                    final StateManager stateManager = 
context.getStateManager();
    +                    final StateMap state = 
stateManager.getState(Scope.CLUSTER);
    +
    +                    final Map<String, String> newValues = new HashMap<>();
    +
    +                    // Persist attributes so that other nodes can copy it
    +                    if (copyAttributes) {
    +                        newValues.putAll(flowFiles.get(0).getAttributes());
    +                    }
    +                    newValues.put(STATE_KEY_LATEST_SUCCESS_TRANSFER, 
String.valueOf(now));
    +
    +                    if (state == null || state.getVersion() == -1) {
    +                        stateManager.setState(newValues, Scope.CLUSTER);
    +                    } else {
    +                        // If this returns false due to race condition, 
it's not a problem since we just need
    +                        // the latest active timestamp.
    +                        stateManager.replace(state, newValues, 
Scope.CLUSTER);
    --- End diff --
    
    While it usually is the case, we can't assume that the new state has a 
later timestamp. It could be case that one node is much more taxed than another 
and it takes longer to transmit state back to the ZooKeeper cluster (where 
clustered state is stored). This should continually check/attempt to replace 
until it succeeds or the stored state is later than the current timestamp. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to