turcsanyip commented on code in PR #8669:
URL: https://github.com/apache/nifi/pull/8669#discussion_r1592885962
##########
nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java:
##########
@@ -447,25 +457,202 @@ private boolean isReconnectedToCluster( final boolean
isConnectedToCluster) {
return !connectedWhenLastTriggered.get() && isConnectedToCluster;
}
- private void reconcileState(final ProcessContext context) {
- try {
- final StateMap state =
context.getStateManager().getState(Scope.CLUSTER);
- final Map<String, String> newState = new HashMap<>();
- newState.putAll(state.toMap());
+ private boolean shouldThisNodeReport(final boolean isClusterScope, final
boolean isReportOnlyOnPrimary, final ProcessContext context) {
+ return !isClusterScope || ((!isReportOnlyOnPrimary ||
getNodeTypeProvider().isPrimary()) && context.isConnectedToCluster());
+ }
+
+ private void sendInactivityMarker(ProcessContext context, ProcessSession
session, long inactivityStartMillis,
+ ComponentLog logger) {
+ FlowFile inactiveFlowFile = session.create();
+ inactiveFlowFile = session.putAttribute(
+ inactiveFlowFile,
+ "inactivityStartMillis", String.valueOf(inactivityStartMillis)
+ );
+ inactiveFlowFile = session.putAttribute(
+ inactiveFlowFile,
+ "inactivityDurationMillis",
+ String.valueOf(System.currentTimeMillis() -
inactivityStartMillis)
+ );
+
+ final byte[] outBytes =
context.getProperty(INACTIVITY_MESSAGE).evaluateAttributeExpressions(inactiveFlowFile).getValue().getBytes(
+ StandardCharsets.UTF_8);
+ inactiveFlowFile = session.write(inactiveFlowFile, out ->
out.write(outBytes));
+
+ session.getProvenanceReporter().create(inactiveFlowFile);
+ session.transfer(inactiveFlowFile, REL_INACTIVE);
+ logger.info("Transferred {} to 'inactive'", inactiveFlowFile);
+ }
- final long validLastSuccessTransfer =
StringUtils.isEmpty(state.get(STATE_KEY_LATEST_SUCCESS_TRANSFER))
- ? latestSuccessTransfer.get()
- :
Math.max(Long.valueOf(state.get(STATE_KEY_LATEST_SUCCESS_TRANSFER)),
latestSuccessTransfer.get());
+ private void sendActivationMarker(ProcessContext context, ProcessSession
session, Map<String, String> attributes,
+ long inactivityStartMillis, ComponentLog logger) {
+ FlowFile activityRestoredFlowFile = session.create();
+ // don't copy the UUID
+ attributes.remove(CoreAttributes.UUID.key());
+ activityRestoredFlowFile =
session.putAllAttributes(activityRestoredFlowFile, attributes);
- newState.put(STATE_KEY_LATEST_SUCCESS_TRANSFER,
String.valueOf(validLastSuccessTransfer));
- context.getStateManager().replace(state, newState, Scope.CLUSTER);
- } catch (IOException e) {
- getLogger().error("Could not reconcile state after (re)connection!
Reason: " + e.getMessage());
- throw new ProcessException(e);
+ activityRestoredFlowFile =
session.putAttribute(activityRestoredFlowFile, "inactivityStartMillis",
String.valueOf(
+ inactivityStartMillis));
+ activityRestoredFlowFile =
session.putAttribute(activityRestoredFlowFile, "inactivityDurationMillis",
String.valueOf(System.currentTimeMillis() - inactivityStartMillis));
+
+ final byte[] outBytes =
context.getProperty(ACTIVITY_RESTORED_MESSAGE).evaluateAttributeExpressions(activityRestoredFlowFile).getValue().getBytes(
+ StandardCharsets.UTF_8);
+ activityRestoredFlowFile = session.write(activityRestoredFlowFile, out
-> out.write(outBytes));
+
+ session.getProvenanceReporter().create(activityRestoredFlowFile);
+ session.transfer(activityRestoredFlowFile, REL_ACTIVITY_RESTORED);
+ logger.info("Transferred {} to 'activity.restored'",
activityRestoredFlowFile);
+ }
+
+ private static class LocalFlowActivityInfo {
+ private static final long NO_VALUE = 0;
+
+ private final long startupTimeMillis;
+ private final long thresholdMillis;
+ private final boolean saveAttributes;
+
+ private long nextSyncMillis = NO_VALUE;
+ private long lastSuccessfulTransfer = NO_VALUE;
+ private Map<String, String> lastSuccessfulTransferAttributes = new
HashMap<>();
+
+ public LocalFlowActivityInfo(long startupTimeMillis, long
thresholdMillis, boolean saveAttributes) {
+ this.startupTimeMillis = startupTimeMillis;
+ this.thresholdMillis = thresholdMillis;
+ this.saveAttributes = saveAttributes;
+ }
+
+ public LocalFlowActivityInfo(long startupTimeMillis, long
thresholdMillis, boolean saveAttributes, long initialLastSuccessfulTransfer) {
+ this(startupTimeMillis, thresholdMillis, saveAttributes);
+ lastSuccessfulTransfer = initialLastSuccessfulTransfer;
+ }
+
+ public synchronized boolean syncNeeded() {
Review Comment:
The whole execution is single threaded due to `@TriggerSerially`. Therefore
`synchronized` is not needed and can be removed.
##########
nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java:
##########
@@ -217,224 +230,221 @@ protected List<PropertyDescriptor>
getSupportedPropertyDescriptors() {
public void onScheduled(final ProcessContext context) {
// Check configuration.
isClusterScope(context, true);
- resetLastSuccessfulTransfer();
- inactive.set(false);
- hasSuccessTransfer.set(false);
- }
-
-
- protected void resetLastSuccessfulTransfer() {
- setLastSuccessfulTransfer(System.currentTimeMillis());
- }
- protected final void setLastSuccessfulTransfer(final long timestamp) {
- latestSuccessTransfer.set(timestamp);
- latestReportedNodeState.set(timestamp);
- }
-
- protected final long getLatestSuccessTransfer() {
- return latestSuccessTransfer.get();
- }
-
- private boolean isClusterScope(final ProcessContext context, boolean
logInvalidConfig) {
- if
(SCOPE_CLUSTER.equals(context.getProperty(MONITORING_SCOPE).getValue())) {
- if (getNodeTypeProvider().isConfiguredForClustering()) {
- return true;
- }
- if (logInvalidConfig) {
- getLogger().warn("NiFi is running as a Standalone mode, but
'cluster' scope is set." +
- " Fallback to 'node' scope. Fix configuration to stop
this message.");
- }
+ final long thresholdMillis =
context.getProperty(THRESHOLD).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+ final boolean copyAttributes =
context.getProperty(COPY_ATTRIBUTES).asBoolean();
+ final boolean resetStateOnRestart =
context.getProperty(RESET_STATE_ON_RESTART).asBoolean();
+
+ // Attempt to load last state by the time of stopping this processor.
A local state only exists if
+ // the monitored flow was already inactive, when the processor was
shutting down.
+ final String storedLastSuccessfulTransfer = resetStateOnRestart ? null
: tryLoadLastSuccessfulTransfer(context);
+
+ if (storedLastSuccessfulTransfer != null) {
+ // Initialize local flow as being inactive since the stored
timestamp.
+ localFlowActivityInfo = new LocalFlowActivityInfo(
+ getStartupTime(), thresholdMillis, copyAttributes,
Long.parseLong(storedLastSuccessfulTransfer));
+ wasActive.set(localFlowActivityInfo.isActive());
+ inactivityStartMillis.set(localFlowActivityInfo.getLastActivity());
+ } else {
+ // Initialize local flow as being active. If there is no traffic,
then it will eventually become inactive.
+ localFlowActivityInfo = new LocalFlowActivityInfo(
+ getStartupTime(), thresholdMillis, copyAttributes);
+ wasActive.set(true);
}
- return false;
}
- private boolean shouldReportOnlyOnPrimary(boolean isClusterScope, final
ProcessContext context) {
- if
(REPORT_NODE_PRIMARY.equals(context.getProperty(REPORTING_NODE).getValue())) {
- if (isClusterScope) {
- return true;
+ @OnStopped
+ public void onStopped(final ProcessContext context) {
+ if (getNodeTypeProvider().isConfiguredForClustering()) {
+ // Shared state needs to be cleared, in order to avoid getting
inactive markers right after starting the
+ // flow after a weekend stop. In single-node setup, there is no
shared state to be cleared, but the line
+ // below would also wipe out the local state. Hence, the check.
+ final StateManager stateManager = context.getStateManager();
+ try {
+ stateManager.clear(Scope.CLUSTER);
+ } catch (IOException e) {
+ getLogger().error("Failed to clear cluster state due to " + e,
e);
}
}
- return false;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) {
- final long thresholdMillis =
context.getProperty(THRESHOLD).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
- final long now = System.currentTimeMillis();
-
final ComponentLog logger = getLogger();
- final boolean copyAttributes =
context.getProperty(COPY_ATTRIBUTES).asBoolean();
- final boolean waitForActivity =
context.getProperty(WAIT_FOR_ACTIVITY).asBoolean();
+
final boolean isClusterScope = isClusterScope(context, false);
final boolean isConnectedToCluster = context.isConnectedToCluster();
- final boolean shouldReportOnlyOnPrimary =
shouldReportOnlyOnPrimary(isClusterScope, context);
+ final boolean wasActive = this.wasActive.get();
+
final List<FlowFile> flowFiles = session.get(50);
- if (isClusterScope(context, true)) {
- if (isReconnectedToCluster(isConnectedToCluster)) {
- reconcileState(context);
- connectedWhenLastTriggered.set(true);
- } else if (!isConnectedToCluster) {
- connectedWhenLastTriggered.set(false);
+ if (!flowFiles.isEmpty()) {
+ final boolean firstKnownTransfer =
!localFlowActivityInfo.hasSuccessfulTransfer();
+ final boolean flowStateMustBecomeActive = !wasActive ||
firstKnownTransfer;
+
+ localFlowActivityInfo.update(flowFiles.get(0));
+
+ if (isClusterScope && flowStateMustBecomeActive) {
+ localFlowActivityInfo.forceSync();
}
}
- boolean isInactive = false;
- long updatedLatestSuccessTransfer = -1;
- StateMap clusterState = null;
-
- if (flowFiles.isEmpty()) {
- final long previousSuccessMillis = latestSuccessTransfer.get();
-
- boolean sendInactiveMarker = false;
-
- isInactive = (now >= previousSuccessMillis + thresholdMillis);
- logger.debug("isInactive={}, previousSuccessMillis={}, now={}",
new Object[]{isInactive, previousSuccessMillis, now});
- if (isInactive && isClusterScope && isConnectedToCluster) {
- // Even if this node has been inactive, there may be other
nodes handling flow actively.
- // However, if this node is active, we don't have to look at
cluster state.
- try {
- clusterState = session.getState(Scope.CLUSTER);
- if (clusterState != null &&
!StringUtils.isEmpty(clusterState.get(STATE_KEY_LATEST_SUCCESS_TRANSFER))) {
- final long latestReportedClusterActivity =
Long.valueOf(clusterState.get(STATE_KEY_LATEST_SUCCESS_TRANSFER));
- isInactive = (now >= latestReportedClusterActivity +
thresholdMillis);
- if (!isInactive) {
- // This node has been inactive, but other node has
more recent activity.
- updatedLatestSuccessTransfer =
latestReportedClusterActivity;
- }
- logger.debug("isInactive={},
latestReportedClusterActivity={}", new Object[]{isInactive,
latestReportedClusterActivity});
- }
- } catch (IOException e) {
- logger.error("Failed to access cluster state. Activity
will not be monitored properly until this is addressed.", e);
- }
+ if (isClusterScope) {
+ if (wasActive && !localFlowActivityInfo.isActive()) {
+ localFlowActivityInfo.forceSync();
}
+ synchronizeState(context);
+ }
+
+ final long thresholdMillis =
context.getProperty(THRESHOLD).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+ final boolean continuallySendMessages =
context.getProperty(CONTINUALLY_SEND_MESSAGES).asBoolean();
+ final boolean waitForActivity =
context.getProperty(WAIT_FOR_ACTIVITY).asBoolean();
- if (isInactive) {
- final boolean continual =
context.getProperty(CONTINUALLY_SEND_MESSAGES).asBoolean();
- sendInactiveMarker = !inactive.getAndSet(true) || (continual
&& (now > lastInactiveMessage.get() + thresholdMillis));
- if (waitForActivity) {
- sendInactiveMarker = sendInactiveMarker &&
hasSuccessTransfer.get();
- }
+ final boolean isActive = localFlowActivityInfo.isActive() ||
!flowFiles.isEmpty();
+ final long lastActivity = localFlowActivityInfo.getLastActivity();
+ final long inactivityStartMillis = this.inactivityStartMillis.get();
+ final boolean timeToRepeatInactiveMessage = (lastInactiveMessage.get()
+ thresholdMillis) <= System.currentTimeMillis();
+
+ final boolean canBecomeInactive = (!isClusterScope ||
isConnectedToCluster)
+ && (!waitForActivity ||
localFlowActivityInfo.hasSuccessfulTransfer());
+
+ if (isActive) {
+ onTriggerActiveFlow(context, session, wasActive, isClusterScope,
inactivityStartMillis, flowFiles);
+ } else if (canBecomeInactive && (wasActive || (continuallySendMessages
&& timeToRepeatInactiveMessage))) {
+ onTriggerInactiveFlow(context, session, isClusterScope,
lastActivity);
+ } else {
+ context.yield(); // no need to dominate CPU checking times; let
other processors run for a bit.
+ }
+
+ if (wasActive && !canBecomeInactive) {
+ // We need to block ACTIVE -> INACTIVE state transition, because
we are not connected to the cluster.
+ // When we reconnect, and the INACTIVE state persists, then the
next onTrigger will do the transition.
+ logger.trace("ACTIVE->INACTIVE transition is blocked, because we
are not connected to the cluster.");
+ } else {
+ this.wasActive.set(isActive);
+ this.inactivityStartMillis.set(lastActivity);
+ }
+ }
+
+ protected long getStartupTime() {
+ return System.currentTimeMillis();
+ }
+
+ protected final long getLatestSuccessTransfer() {
+ return localFlowActivityInfo.getLastSuccessfulTransfer();
+ }
+
+ private String tryLoadLastSuccessfulTransfer(ProcessContext context) {
+ final StateManager stateManager = context.getStateManager();
+ try {
+ final StateMap localStateMap = stateManager.getState(Scope.LOCAL);
+ return localStateMap.get(STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO);
+ } catch (IOException e) {
+ throw new ProcessException("Failed to load local state due to " +
e, e);
+ }
+ }
+
+ private void synchronizeState(ProcessContext context) {
+ final ComponentLog logger = getLogger();
+ final boolean isConnectedToCluster = context.isConnectedToCluster();
+
+ if (isReconnectedToCluster(isConnectedToCluster)) {
+ localFlowActivityInfo.forceSync();
+ connectedWhenLastTriggered.set(true);
+ }
+ if (!isConnectedToCluster) {
+ connectedWhenLastTriggered.set(false);
+ } else if (localFlowActivityInfo.syncNeeded()) {
+ final CommonFlowActivityInfo commonFlowActivityInfo = new
CommonFlowActivityInfo(context);
+ localFlowActivityInfo.update(commonFlowActivityInfo);
+
+ try {
+ commonFlowActivityInfo.update(localFlowActivityInfo);
+ localFlowActivityInfo.setNextSyncMillis();
+ } catch (final SaveSharedFlowStateException ex) {
+ logger.debug("Failed to update common state.", ex);
}
+ }
+ }
- if (sendInactiveMarker && shouldThisNodeReport(isClusterScope,
shouldReportOnlyOnPrimary, context)) {
- lastInactiveMessage.set(System.currentTimeMillis());
+ private void onTriggerInactiveFlow(ProcessContext context, ProcessSession
session, boolean isClusterScope, long lastActivity) {
+ final ComponentLog logger = getLogger();
+ final boolean shouldReportOnlyOnPrimary =
shouldReportOnlyOnPrimary(isClusterScope, context);
+ final boolean shouldThisNodeReport =
shouldThisNodeReport(isClusterScope, shouldReportOnlyOnPrimary, context);
Review Comment:
`shouldThisNodeReport()` could call `shouldReportOnlyOnPrimary()` internally
because all the parameters are passed to that method too and
`shouldReportOnlyOnPrimary` local variable is not used anywhere else.
Also in `onTriggerActiveFlow()`.
##########
nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java:
##########
@@ -217,224 +230,221 @@ protected List<PropertyDescriptor>
getSupportedPropertyDescriptors() {
public void onScheduled(final ProcessContext context) {
// Check configuration.
isClusterScope(context, true);
- resetLastSuccessfulTransfer();
- inactive.set(false);
- hasSuccessTransfer.set(false);
- }
-
-
- protected void resetLastSuccessfulTransfer() {
- setLastSuccessfulTransfer(System.currentTimeMillis());
- }
- protected final void setLastSuccessfulTransfer(final long timestamp) {
- latestSuccessTransfer.set(timestamp);
- latestReportedNodeState.set(timestamp);
- }
-
- protected final long getLatestSuccessTransfer() {
- return latestSuccessTransfer.get();
- }
-
- private boolean isClusterScope(final ProcessContext context, boolean
logInvalidConfig) {
- if
(SCOPE_CLUSTER.equals(context.getProperty(MONITORING_SCOPE).getValue())) {
- if (getNodeTypeProvider().isConfiguredForClustering()) {
- return true;
- }
- if (logInvalidConfig) {
- getLogger().warn("NiFi is running as a Standalone mode, but
'cluster' scope is set." +
- " Fallback to 'node' scope. Fix configuration to stop
this message.");
- }
+ final long thresholdMillis =
context.getProperty(THRESHOLD).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+ final boolean copyAttributes =
context.getProperty(COPY_ATTRIBUTES).asBoolean();
+ final boolean resetStateOnRestart =
context.getProperty(RESET_STATE_ON_RESTART).asBoolean();
+
+ // Attempt to load last state by the time of stopping this processor.
A local state only exists if
+ // the monitored flow was already inactive, when the processor was
shutting down.
+ final String storedLastSuccessfulTransfer = resetStateOnRestart ? null
: tryLoadLastSuccessfulTransfer(context);
+
+ if (storedLastSuccessfulTransfer != null) {
+ // Initialize local flow as being inactive since the stored
timestamp.
+ localFlowActivityInfo = new LocalFlowActivityInfo(
+ getStartupTime(), thresholdMillis, copyAttributes,
Long.parseLong(storedLastSuccessfulTransfer));
+ wasActive.set(localFlowActivityInfo.isActive());
+ inactivityStartMillis.set(localFlowActivityInfo.getLastActivity());
+ } else {
+ // Initialize local flow as being active. If there is no traffic,
then it will eventually become inactive.
+ localFlowActivityInfo = new LocalFlowActivityInfo(
+ getStartupTime(), thresholdMillis, copyAttributes);
+ wasActive.set(true);
}
- return false;
}
- private boolean shouldReportOnlyOnPrimary(boolean isClusterScope, final
ProcessContext context) {
- if
(REPORT_NODE_PRIMARY.equals(context.getProperty(REPORTING_NODE).getValue())) {
- if (isClusterScope) {
- return true;
+ @OnStopped
+ public void onStopped(final ProcessContext context) {
+ if (getNodeTypeProvider().isConfiguredForClustering()) {
+ // Shared state needs to be cleared, in order to avoid getting
inactive markers right after starting the
+ // flow after a weekend stop. In single-node setup, there is no
shared state to be cleared, but the line
+ // below would also wipe out the local state. Hence, the check.
+ final StateManager stateManager = context.getStateManager();
+ try {
+ stateManager.clear(Scope.CLUSTER);
+ } catch (IOException e) {
+ getLogger().error("Failed to clear cluster state due to " + e,
e);
}
}
- return false;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) {
- final long thresholdMillis =
context.getProperty(THRESHOLD).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
- final long now = System.currentTimeMillis();
-
final ComponentLog logger = getLogger();
- final boolean copyAttributes =
context.getProperty(COPY_ATTRIBUTES).asBoolean();
- final boolean waitForActivity =
context.getProperty(WAIT_FOR_ACTIVITY).asBoolean();
+
final boolean isClusterScope = isClusterScope(context, false);
final boolean isConnectedToCluster = context.isConnectedToCluster();
- final boolean shouldReportOnlyOnPrimary =
shouldReportOnlyOnPrimary(isClusterScope, context);
+ final boolean wasActive = this.wasActive.get();
+
final List<FlowFile> flowFiles = session.get(50);
- if (isClusterScope(context, true)) {
- if (isReconnectedToCluster(isConnectedToCluster)) {
- reconcileState(context);
- connectedWhenLastTriggered.set(true);
- } else if (!isConnectedToCluster) {
- connectedWhenLastTriggered.set(false);
+ if (!flowFiles.isEmpty()) {
+ final boolean firstKnownTransfer =
!localFlowActivityInfo.hasSuccessfulTransfer();
+ final boolean flowStateMustBecomeActive = !wasActive ||
firstKnownTransfer;
+
+ localFlowActivityInfo.update(flowFiles.get(0));
+
+ if (isClusterScope && flowStateMustBecomeActive) {
+ localFlowActivityInfo.forceSync();
}
}
- boolean isInactive = false;
- long updatedLatestSuccessTransfer = -1;
- StateMap clusterState = null;
-
- if (flowFiles.isEmpty()) {
- final long previousSuccessMillis = latestSuccessTransfer.get();
-
- boolean sendInactiveMarker = false;
-
- isInactive = (now >= previousSuccessMillis + thresholdMillis);
- logger.debug("isInactive={}, previousSuccessMillis={}, now={}",
new Object[]{isInactive, previousSuccessMillis, now});
- if (isInactive && isClusterScope && isConnectedToCluster) {
- // Even if this node has been inactive, there may be other
nodes handling flow actively.
- // However, if this node is active, we don't have to look at
cluster state.
- try {
- clusterState = session.getState(Scope.CLUSTER);
- if (clusterState != null &&
!StringUtils.isEmpty(clusterState.get(STATE_KEY_LATEST_SUCCESS_TRANSFER))) {
- final long latestReportedClusterActivity =
Long.valueOf(clusterState.get(STATE_KEY_LATEST_SUCCESS_TRANSFER));
- isInactive = (now >= latestReportedClusterActivity +
thresholdMillis);
- if (!isInactive) {
- // This node has been inactive, but other node has
more recent activity.
- updatedLatestSuccessTransfer =
latestReportedClusterActivity;
- }
- logger.debug("isInactive={},
latestReportedClusterActivity={}", new Object[]{isInactive,
latestReportedClusterActivity});
- }
- } catch (IOException e) {
- logger.error("Failed to access cluster state. Activity
will not be monitored properly until this is addressed.", e);
- }
+ if (isClusterScope) {
+ if (wasActive && !localFlowActivityInfo.isActive()) {
+ localFlowActivityInfo.forceSync();
}
+ synchronizeState(context);
+ }
+
+ final long thresholdMillis =
context.getProperty(THRESHOLD).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+ final boolean continuallySendMessages =
context.getProperty(CONTINUALLY_SEND_MESSAGES).asBoolean();
+ final boolean waitForActivity =
context.getProperty(WAIT_FOR_ACTIVITY).asBoolean();
- if (isInactive) {
- final boolean continual =
context.getProperty(CONTINUALLY_SEND_MESSAGES).asBoolean();
- sendInactiveMarker = !inactive.getAndSet(true) || (continual
&& (now > lastInactiveMessage.get() + thresholdMillis));
- if (waitForActivity) {
- sendInactiveMarker = sendInactiveMarker &&
hasSuccessTransfer.get();
- }
+ final boolean isActive = localFlowActivityInfo.isActive() ||
!flowFiles.isEmpty();
+ final long lastActivity = localFlowActivityInfo.getLastActivity();
+ final long inactivityStartMillis = this.inactivityStartMillis.get();
+ final boolean timeToRepeatInactiveMessage = (lastInactiveMessage.get()
+ thresholdMillis) <= System.currentTimeMillis();
+
+ final boolean canBecomeInactive = (!isClusterScope ||
isConnectedToCluster)
+ && (!waitForActivity ||
localFlowActivityInfo.hasSuccessfulTransfer());
+
+ if (isActive) {
+ onTriggerActiveFlow(context, session, wasActive, isClusterScope,
inactivityStartMillis, flowFiles);
+ } else if (canBecomeInactive && (wasActive || (continuallySendMessages
&& timeToRepeatInactiveMessage))) {
+ onTriggerInactiveFlow(context, session, isClusterScope,
lastActivity);
+ } else {
+ context.yield(); // no need to dominate CPU checking times; let
other processors run for a bit.
+ }
+
+ if (wasActive && !canBecomeInactive) {
+ // We need to block ACTIVE -> INACTIVE state transition, because
we are not connected to the cluster.
+ // When we reconnect, and the INACTIVE state persists, then the
next onTrigger will do the transition.
+ logger.trace("ACTIVE->INACTIVE transition is blocked, because we
are not connected to the cluster.");
+ } else {
+ this.wasActive.set(isActive);
+ this.inactivityStartMillis.set(lastActivity);
+ }
+ }
+
+ protected long getStartupTime() {
+ return System.currentTimeMillis();
+ }
+
+ protected final long getLatestSuccessTransfer() {
+ return localFlowActivityInfo.getLastSuccessfulTransfer();
+ }
Review Comment:
```suggestion
protected final long getLastSuccessfulTransfer() {
return localFlowActivityInfo.getLastSuccessfulTransfer();
}
```
##########
nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java:
##########
@@ -72,10 +70,18 @@
@WritesAttributes({
@WritesAttribute(attribute = "inactivityStartMillis", description = "The
time at which Inactivity began, in the form of milliseconds since Epoch"),
@WritesAttribute(attribute = "inactivityDurationMillis", description =
"The number of milliseconds that the inactivity has spanned")})
-@Stateful(scopes = Scope.CLUSTER, description = "MonitorActivity stores the
last timestamp at each node as state, so that it can examine activity at
cluster wide." +
- "If 'Copy Attribute' is set to true, then flow file attributes are
also persisted.")
+@Stateful(
+ scopes = { Scope.CLUSTER, Scope.LOCAL },
+ description = "MonitorActivity stores the last timestamp at each node
as state, "
+ + "so that it can examine activity at cluster wide. "
+ + "If 'Copy Attribute' is set to true, then flow file
attributes are also persisted. "
+ + "In local scope, it stores last known activity timestamp if
the flow is inactive."
+)
public class MonitorActivity extends AbstractProcessor {
+ public static final String STATE_KEY_LATEST_SUCCESS_TRANSFER =
"MonitorActivity.latestSuccessTransfer";
+ public static final String STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO =
"LocalFlowActivityInfo.lastSuccessfulTransfer";
Review Comment:
`lastSuccessfulTransfer` is used in every other places. For sake of
consistency, I would suggest renaming it too:
```suggestion
public static final String STATE_KEY_COMMON_FLOW_ACTIVITY_INFO =
"CommonFlowActivityInfo.lastSuccessfulTransfer";
public static final String STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO =
"LocalFlowActivityInfo.lastSuccessfulTransfer";
```
##########
nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java:
##########
@@ -217,224 +230,221 @@ protected List<PropertyDescriptor>
getSupportedPropertyDescriptors() {
public void onScheduled(final ProcessContext context) {
// Check configuration.
isClusterScope(context, true);
- resetLastSuccessfulTransfer();
- inactive.set(false);
- hasSuccessTransfer.set(false);
- }
-
-
- protected void resetLastSuccessfulTransfer() {
- setLastSuccessfulTransfer(System.currentTimeMillis());
- }
- protected final void setLastSuccessfulTransfer(final long timestamp) {
- latestSuccessTransfer.set(timestamp);
- latestReportedNodeState.set(timestamp);
- }
-
- protected final long getLatestSuccessTransfer() {
- return latestSuccessTransfer.get();
- }
-
- private boolean isClusterScope(final ProcessContext context, boolean
logInvalidConfig) {
- if
(SCOPE_CLUSTER.equals(context.getProperty(MONITORING_SCOPE).getValue())) {
- if (getNodeTypeProvider().isConfiguredForClustering()) {
- return true;
- }
- if (logInvalidConfig) {
- getLogger().warn("NiFi is running as a Standalone mode, but
'cluster' scope is set." +
- " Fallback to 'node' scope. Fix configuration to stop
this message.");
- }
+ final long thresholdMillis =
context.getProperty(THRESHOLD).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+ final boolean copyAttributes =
context.getProperty(COPY_ATTRIBUTES).asBoolean();
+ final boolean resetStateOnRestart =
context.getProperty(RESET_STATE_ON_RESTART).asBoolean();
+
+ // Attempt to load last state by the time of stopping this processor.
A local state only exists if
+ // the monitored flow was already inactive, when the processor was
shutting down.
+ final String storedLastSuccessfulTransfer = resetStateOnRestart ? null
: tryLoadLastSuccessfulTransfer(context);
+
+ if (storedLastSuccessfulTransfer != null) {
+ // Initialize local flow as being inactive since the stored
timestamp.
+ localFlowActivityInfo = new LocalFlowActivityInfo(
+ getStartupTime(), thresholdMillis, copyAttributes,
Long.parseLong(storedLastSuccessfulTransfer));
+ wasActive.set(localFlowActivityInfo.isActive());
+ inactivityStartMillis.set(localFlowActivityInfo.getLastActivity());
+ } else {
+ // Initialize local flow as being active. If there is no traffic,
then it will eventually become inactive.
+ localFlowActivityInfo = new LocalFlowActivityInfo(
+ getStartupTime(), thresholdMillis, copyAttributes);
+ wasActive.set(true);
}
- return false;
}
- private boolean shouldReportOnlyOnPrimary(boolean isClusterScope, final
ProcessContext context) {
- if
(REPORT_NODE_PRIMARY.equals(context.getProperty(REPORTING_NODE).getValue())) {
- if (isClusterScope) {
- return true;
+ @OnStopped
+ public void onStopped(final ProcessContext context) {
+ if (getNodeTypeProvider().isConfiguredForClustering()) {
+ // Shared state needs to be cleared, in order to avoid getting
inactive markers right after starting the
+ // flow after a weekend stop. In single-node setup, there is no
shared state to be cleared, but the line
+ // below would also wipe out the local state. Hence, the check.
+ final StateManager stateManager = context.getStateManager();
+ try {
+ stateManager.clear(Scope.CLUSTER);
Review Comment:
`getNodeTypeProvider().isConfiguredForClustering()` returns `true` for a
disconnected node and it would clear its local state on restart (cluster scope
falls back to local scope in case of disconnected nodes).
As only the shared state needs to cleared but not the local one, I suggest
adding `context.isConnectedToCluster()` to the condition.
##########
nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java:
##########
@@ -217,224 +230,221 @@ protected List<PropertyDescriptor>
getSupportedPropertyDescriptors() {
public void onScheduled(final ProcessContext context) {
// Check configuration.
isClusterScope(context, true);
- resetLastSuccessfulTransfer();
- inactive.set(false);
- hasSuccessTransfer.set(false);
- }
-
-
- protected void resetLastSuccessfulTransfer() {
- setLastSuccessfulTransfer(System.currentTimeMillis());
- }
- protected final void setLastSuccessfulTransfer(final long timestamp) {
- latestSuccessTransfer.set(timestamp);
- latestReportedNodeState.set(timestamp);
- }
-
- protected final long getLatestSuccessTransfer() {
- return latestSuccessTransfer.get();
- }
-
- private boolean isClusterScope(final ProcessContext context, boolean
logInvalidConfig) {
- if
(SCOPE_CLUSTER.equals(context.getProperty(MONITORING_SCOPE).getValue())) {
- if (getNodeTypeProvider().isConfiguredForClustering()) {
- return true;
- }
- if (logInvalidConfig) {
- getLogger().warn("NiFi is running as a Standalone mode, but
'cluster' scope is set." +
- " Fallback to 'node' scope. Fix configuration to stop
this message.");
- }
+ final long thresholdMillis =
context.getProperty(THRESHOLD).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+ final boolean copyAttributes =
context.getProperty(COPY_ATTRIBUTES).asBoolean();
+ final boolean resetStateOnRestart =
context.getProperty(RESET_STATE_ON_RESTART).asBoolean();
+
+ // Attempt to load last state by the time of stopping this processor.
A local state only exists if
+ // the monitored flow was already inactive, when the processor was
shutting down.
+ final String storedLastSuccessfulTransfer = resetStateOnRestart ? null
: tryLoadLastSuccessfulTransfer(context);
+
+ if (storedLastSuccessfulTransfer != null) {
+ // Initialize local flow as being inactive since the stored
timestamp.
+ localFlowActivityInfo = new LocalFlowActivityInfo(
+ getStartupTime(), thresholdMillis, copyAttributes,
Long.parseLong(storedLastSuccessfulTransfer));
+ wasActive.set(localFlowActivityInfo.isActive());
+ inactivityStartMillis.set(localFlowActivityInfo.getLastActivity());
+ } else {
+ // Initialize local flow as being active. If there is no traffic,
then it will eventually become inactive.
+ localFlowActivityInfo = new LocalFlowActivityInfo(
+ getStartupTime(), thresholdMillis, copyAttributes);
+ wasActive.set(true);
}
- return false;
}
- private boolean shouldReportOnlyOnPrimary(boolean isClusterScope, final
ProcessContext context) {
- if
(REPORT_NODE_PRIMARY.equals(context.getProperty(REPORTING_NODE).getValue())) {
- if (isClusterScope) {
- return true;
+ @OnStopped
+ public void onStopped(final ProcessContext context) {
+ if (getNodeTypeProvider().isConfiguredForClustering()) {
+ // Shared state needs to be cleared, in order to avoid getting
inactive markers right after starting the
+ // flow after a weekend stop. In single-node setup, there is no
shared state to be cleared, but the line
+ // below would also wipe out the local state. Hence, the check.
+ final StateManager stateManager = context.getStateManager();
+ try {
+ stateManager.clear(Scope.CLUSTER);
+ } catch (IOException e) {
+ getLogger().error("Failed to clear cluster state due to " + e,
e);
Review Comment:
It is enough to pass the exception once and its message and stack trace will
be logged. Please remove the other "due to" clauses too.
```suggestion
getLogger().error("Failed to clear cluster state", e);
```
##########
nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java:
##########
@@ -170,18 +184,16 @@ public class MonitorActivity extends AbstractProcessor {
.description("This relationship is used to transfer an Activity
Restored indicator when FlowFiles are routing to 'success' following a "
+ "period of inactivity")
.build();
- public static final Charset UTF8 = Charset.forName("UTF-8");
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
- private final AtomicLong latestSuccessTransfer = new
AtomicLong(System.currentTimeMillis());
- private final AtomicLong latestReportedNodeState = new
AtomicLong(System.currentTimeMillis());
- private final AtomicBoolean inactive = new AtomicBoolean(false);
- private final AtomicBoolean hasSuccessTransfer = new AtomicBoolean(false);
private final AtomicBoolean connectedWhenLastTriggered = new
AtomicBoolean(false);
- private final AtomicLong lastInactiveMessage = new
AtomicLong(System.currentTimeMillis());
- public static final String STATE_KEY_LATEST_SUCCESS_TRANSFER =
"MonitorActivity.latestSuccessTransfer";
+ private final AtomicLong lastInactiveMessage = new AtomicLong();
+ private final AtomicLong inactivityStartMillis = new
AtomicLong(System.currentTimeMillis());
+ private final AtomicBoolean wasActive = new AtomicBoolean(true);
+
+ private LocalFlowActivityInfo localFlowActivityInfo;
Review Comment:
The field should be `volatile` because it is accessed by different threads
(written by `onScheduled()`, read by `onTrigger()`)
```suggestion
private volatile LocalFlowActivityInfo localFlowActivityInfo;
```
##########
nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java:
##########
@@ -217,224 +230,221 @@ protected List<PropertyDescriptor>
getSupportedPropertyDescriptors() {
public void onScheduled(final ProcessContext context) {
// Check configuration.
isClusterScope(context, true);
- resetLastSuccessfulTransfer();
- inactive.set(false);
- hasSuccessTransfer.set(false);
- }
-
-
- protected void resetLastSuccessfulTransfer() {
- setLastSuccessfulTransfer(System.currentTimeMillis());
- }
- protected final void setLastSuccessfulTransfer(final long timestamp) {
- latestSuccessTransfer.set(timestamp);
- latestReportedNodeState.set(timestamp);
- }
-
- protected final long getLatestSuccessTransfer() {
- return latestSuccessTransfer.get();
- }
-
- private boolean isClusterScope(final ProcessContext context, boolean
logInvalidConfig) {
- if
(SCOPE_CLUSTER.equals(context.getProperty(MONITORING_SCOPE).getValue())) {
- if (getNodeTypeProvider().isConfiguredForClustering()) {
- return true;
- }
- if (logInvalidConfig) {
- getLogger().warn("NiFi is running as a Standalone mode, but
'cluster' scope is set." +
- " Fallback to 'node' scope. Fix configuration to stop
this message.");
- }
+ final long thresholdMillis =
context.getProperty(THRESHOLD).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+ final boolean copyAttributes =
context.getProperty(COPY_ATTRIBUTES).asBoolean();
+ final boolean resetStateOnRestart =
context.getProperty(RESET_STATE_ON_RESTART).asBoolean();
+
+ // Attempt to load last state by the time of stopping this processor.
A local state only exists if
+ // the monitored flow was already inactive, when the processor was
shutting down.
+ final String storedLastSuccessfulTransfer = resetStateOnRestart ? null
: tryLoadLastSuccessfulTransfer(context);
+
+ if (storedLastSuccessfulTransfer != null) {
+ // Initialize local flow as being inactive since the stored
timestamp.
+ localFlowActivityInfo = new LocalFlowActivityInfo(
+ getStartupTime(), thresholdMillis, copyAttributes,
Long.parseLong(storedLastSuccessfulTransfer));
+ wasActive.set(localFlowActivityInfo.isActive());
+ inactivityStartMillis.set(localFlowActivityInfo.getLastActivity());
+ } else {
+ // Initialize local flow as being active. If there is no traffic,
then it will eventually become inactive.
+ localFlowActivityInfo = new LocalFlowActivityInfo(
+ getStartupTime(), thresholdMillis, copyAttributes);
+ wasActive.set(true);
}
- return false;
}
- private boolean shouldReportOnlyOnPrimary(boolean isClusterScope, final
ProcessContext context) {
- if
(REPORT_NODE_PRIMARY.equals(context.getProperty(REPORTING_NODE).getValue())) {
- if (isClusterScope) {
- return true;
+ @OnStopped
+ public void onStopped(final ProcessContext context) {
+ if (getNodeTypeProvider().isConfiguredForClustering()) {
+ // Shared state needs to be cleared, in order to avoid getting
inactive markers right after starting the
+ // flow after a weekend stop. In single-node setup, there is no
shared state to be cleared, but the line
+ // below would also wipe out the local state. Hence, the check.
+ final StateManager stateManager = context.getStateManager();
+ try {
+ stateManager.clear(Scope.CLUSTER);
+ } catch (IOException e) {
+ getLogger().error("Failed to clear cluster state due to " + e,
e);
}
}
- return false;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) {
- final long thresholdMillis =
context.getProperty(THRESHOLD).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
- final long now = System.currentTimeMillis();
-
final ComponentLog logger = getLogger();
- final boolean copyAttributes =
context.getProperty(COPY_ATTRIBUTES).asBoolean();
- final boolean waitForActivity =
context.getProperty(WAIT_FOR_ACTIVITY).asBoolean();
+
final boolean isClusterScope = isClusterScope(context, false);
final boolean isConnectedToCluster = context.isConnectedToCluster();
- final boolean shouldReportOnlyOnPrimary =
shouldReportOnlyOnPrimary(isClusterScope, context);
+ final boolean wasActive = this.wasActive.get();
+
final List<FlowFile> flowFiles = session.get(50);
- if (isClusterScope(context, true)) {
- if (isReconnectedToCluster(isConnectedToCluster)) {
- reconcileState(context);
- connectedWhenLastTriggered.set(true);
- } else if (!isConnectedToCluster) {
- connectedWhenLastTriggered.set(false);
+ if (!flowFiles.isEmpty()) {
+ final boolean firstKnownTransfer =
!localFlowActivityInfo.hasSuccessfulTransfer();
+ final boolean flowStateMustBecomeActive = !wasActive ||
firstKnownTransfer;
+
+ localFlowActivityInfo.update(flowFiles.get(0));
+
+ if (isClusterScope && flowStateMustBecomeActive) {
+ localFlowActivityInfo.forceSync();
}
}
- boolean isInactive = false;
- long updatedLatestSuccessTransfer = -1;
- StateMap clusterState = null;
-
- if (flowFiles.isEmpty()) {
- final long previousSuccessMillis = latestSuccessTransfer.get();
-
- boolean sendInactiveMarker = false;
-
- isInactive = (now >= previousSuccessMillis + thresholdMillis);
- logger.debug("isInactive={}, previousSuccessMillis={}, now={}",
new Object[]{isInactive, previousSuccessMillis, now});
- if (isInactive && isClusterScope && isConnectedToCluster) {
- // Even if this node has been inactive, there may be other
nodes handling flow actively.
- // However, if this node is active, we don't have to look at
cluster state.
- try {
- clusterState = session.getState(Scope.CLUSTER);
- if (clusterState != null &&
!StringUtils.isEmpty(clusterState.get(STATE_KEY_LATEST_SUCCESS_TRANSFER))) {
- final long latestReportedClusterActivity =
Long.valueOf(clusterState.get(STATE_KEY_LATEST_SUCCESS_TRANSFER));
- isInactive = (now >= latestReportedClusterActivity +
thresholdMillis);
- if (!isInactive) {
- // This node has been inactive, but other node has
more recent activity.
- updatedLatestSuccessTransfer =
latestReportedClusterActivity;
- }
- logger.debug("isInactive={},
latestReportedClusterActivity={}", new Object[]{isInactive,
latestReportedClusterActivity});
- }
- } catch (IOException e) {
- logger.error("Failed to access cluster state. Activity
will not be monitored properly until this is addressed.", e);
- }
+ if (isClusterScope) {
+ if (wasActive && !localFlowActivityInfo.isActive()) {
+ localFlowActivityInfo.forceSync();
}
+ synchronizeState(context);
+ }
+
+ final long thresholdMillis =
context.getProperty(THRESHOLD).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+ final boolean continuallySendMessages =
context.getProperty(CONTINUALLY_SEND_MESSAGES).asBoolean();
+ final boolean waitForActivity =
context.getProperty(WAIT_FOR_ACTIVITY).asBoolean();
- if (isInactive) {
- final boolean continual =
context.getProperty(CONTINUALLY_SEND_MESSAGES).asBoolean();
- sendInactiveMarker = !inactive.getAndSet(true) || (continual
&& (now > lastInactiveMessage.get() + thresholdMillis));
- if (waitForActivity) {
- sendInactiveMarker = sendInactiveMarker &&
hasSuccessTransfer.get();
- }
+ final boolean isActive = localFlowActivityInfo.isActive() ||
!flowFiles.isEmpty();
+ final long lastActivity = localFlowActivityInfo.getLastActivity();
+ final long inactivityStartMillis = this.inactivityStartMillis.get();
+ final boolean timeToRepeatInactiveMessage = (lastInactiveMessage.get()
+ thresholdMillis) <= System.currentTimeMillis();
+
+ final boolean canBecomeInactive = (!isClusterScope ||
isConnectedToCluster)
+ && (!waitForActivity ||
localFlowActivityInfo.hasSuccessfulTransfer());
+
+ if (isActive) {
+ onTriggerActiveFlow(context, session, wasActive, isClusterScope,
inactivityStartMillis, flowFiles);
+ } else if (canBecomeInactive && (wasActive || (continuallySendMessages
&& timeToRepeatInactiveMessage))) {
+ onTriggerInactiveFlow(context, session, isClusterScope,
lastActivity);
+ } else {
+ context.yield(); // no need to dominate CPU checking times; let
other processors run for a bit.
+ }
+
+ if (wasActive && !canBecomeInactive) {
+ // We need to block ACTIVE -> INACTIVE state transition, because
we are not connected to the cluster.
+ // When we reconnect, and the INACTIVE state persists, then the
next onTrigger will do the transition.
+ logger.trace("ACTIVE->INACTIVE transition is blocked, because we
are not connected to the cluster.");
+ } else {
+ this.wasActive.set(isActive);
+ this.inactivityStartMillis.set(lastActivity);
+ }
+ }
+
+ protected long getStartupTime() {
+ return System.currentTimeMillis();
+ }
+
+ protected final long getLatestSuccessTransfer() {
+ return localFlowActivityInfo.getLastSuccessfulTransfer();
+ }
+
+ private String tryLoadLastSuccessfulTransfer(ProcessContext context) {
+ final StateManager stateManager = context.getStateManager();
+ try {
+ final StateMap localStateMap = stateManager.getState(Scope.LOCAL);
+ return localStateMap.get(STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO);
+ } catch (IOException e) {
+ throw new ProcessException("Failed to load local state due to " +
e, e);
+ }
+ }
+
+ private void synchronizeState(ProcessContext context) {
+ final ComponentLog logger = getLogger();
+ final boolean isConnectedToCluster = context.isConnectedToCluster();
+
+ if (isReconnectedToCluster(isConnectedToCluster)) {
+ localFlowActivityInfo.forceSync();
+ connectedWhenLastTriggered.set(true);
+ }
+ if (!isConnectedToCluster) {
+ connectedWhenLastTriggered.set(false);
+ } else if (localFlowActivityInfo.syncNeeded()) {
+ final CommonFlowActivityInfo commonFlowActivityInfo = new
CommonFlowActivityInfo(context);
+ localFlowActivityInfo.update(commonFlowActivityInfo);
+
+ try {
+ commonFlowActivityInfo.update(localFlowActivityInfo);
+ localFlowActivityInfo.setNextSyncMillis();
+ } catch (final SaveSharedFlowStateException ex) {
+ logger.debug("Failed to update common state.", ex);
}
+ }
+ }
- if (sendInactiveMarker && shouldThisNodeReport(isClusterScope,
shouldReportOnlyOnPrimary, context)) {
- lastInactiveMessage.set(System.currentTimeMillis());
+ private void onTriggerInactiveFlow(ProcessContext context, ProcessSession
session, boolean isClusterScope, long lastActivity) {
+ final ComponentLog logger = getLogger();
+ final boolean shouldReportOnlyOnPrimary =
shouldReportOnlyOnPrimary(isClusterScope, context);
+ final boolean shouldThisNodeReport =
shouldThisNodeReport(isClusterScope, shouldReportOnlyOnPrimary, context);
- FlowFile inactiveFlowFile = session.create();
- inactiveFlowFile = session.putAttribute(inactiveFlowFile,
"inactivityStartMillis", String.valueOf(previousSuccessMillis));
- inactiveFlowFile = session.putAttribute(inactiveFlowFile,
"inactivityDurationMillis", String.valueOf(now - previousSuccessMillis));
+ if (shouldThisNodeReport) {
+ sendInactivityMarker(context, session, lastActivity, logger);
+ }
+ lastInactiveMessage.set(System.currentTimeMillis());
+ setInactivityFlag(context.getStateManager());
+ }
- final byte[] outBytes =
context.getProperty(INACTIVITY_MESSAGE).evaluateAttributeExpressions(inactiveFlowFile).getValue().getBytes(UTF8);
- inactiveFlowFile = session.write(inactiveFlowFile, new
OutputStreamCallback() {
- @Override
- public void process(final OutputStream out) throws
IOException {
- out.write(outBytes);
- }
- });
+ private void onTriggerActiveFlow(ProcessContext context, ProcessSession
session, boolean wasActive, boolean isClusterScope,
+ long inactivityStartMillis, List<FlowFile> flowFiles) {
+ final ComponentLog logger = getLogger();
+ final boolean shouldReportOnlyOnPrimary =
shouldReportOnlyOnPrimary(isClusterScope, context);
+ final boolean shouldThisNodeReport =
shouldThisNodeReport(isClusterScope, shouldReportOnlyOnPrimary, context);
- session.getProvenanceReporter().create(inactiveFlowFile);
- session.transfer(inactiveFlowFile, REL_INACTIVE);
- logger.info("Transferred {} to 'inactive'", new
Object[]{inactiveFlowFile});
- } else {
- context.yield(); // no need to dominate CPU checking times;
let other processors run for a bit.
+ if (!wasActive) {
+ if (shouldThisNodeReport) {
Review Comment:
`shouldThisNodeReport` local variable could be moved into `if (!wasActive) {
... }`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]