This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 2ea497f81c NIFI-13072: Fix MonitorActivity problems with cluster scope
flow monitoring
2ea497f81c is described below
commit 2ea497f81c83606e2daa931b5ad7cb0a267ae850
Author: Rajmund Takacs <[email protected]>
AuthorDate: Thu Apr 4 17:09:59 2024 +0200
NIFI-13072: Fix MonitorActivity problems with cluster scope flow monitoring
This closes #8669.
Signed-off-by: Peter Turcsanyi <[email protected]>
(cherry picked from commit bffacdec982a9f3f0bf6004bcecbd21d1049e401)
---
.../nifi/processors/standard/MonitorActivity.java | 624 +++++++++++++--------
.../processors/standard/TestMonitorActivity.java | 615 +++++++++++++++++---
2 files changed, 942 insertions(+), 297 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
index 7021f97fc9..43efc3f82b 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
@@ -16,6 +16,20 @@
*/
package org.apache.nifi.processors.standard;
+import static java.util.Collections.singletonMap;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
@@ -30,7 +44,6 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
@@ -44,23 +57,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.util.StringUtils;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
@SideEffectFree
@TriggerSerially
@@ -72,10 +69,18 @@ import java.util.concurrent.atomic.AtomicLong;
@WritesAttributes({
@WritesAttribute(attribute = "inactivityStartMillis", description = "The
time at which Inactivity began, in the form of milliseconds since Epoch"),
@WritesAttribute(attribute = "inactivityDurationMillis", description =
"The number of milliseconds that the inactivity has spanned")})
-@Stateful(scopes = Scope.CLUSTER, description = "MonitorActivity stores the
last timestamp at each node as state, so that it can examine activity at
cluster wide." +
- "If 'Copy Attribute' is set to true, then flow file attributes are
also persisted.")
+@Stateful(
+ scopes = { Scope.CLUSTER, Scope.LOCAL },
+ description = "MonitorActivity stores the last timestamp at each node
as state, "
+ + "so that it can examine activity at cluster wide. "
+ + "If 'Copy Attribute' is set to true, then flow file
attributes are also persisted. "
+ + "In local scope, it stores last known activity timestamp if
the flow is inactive."
+)
public class MonitorActivity extends AbstractProcessor {
+ public static final String STATE_KEY_COMMON_FLOW_ACTIVITY_INFO =
"CommonFlowActivityInfo.lastSuccessfulTransfer";
+ public static final String STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO =
"LocalFlowActivityInfo.lastSuccessfulTransfer";
+
public static final AllowableValue SCOPE_NODE = new AllowableValue("node");
public static final AllowableValue SCOPE_CLUSTER = new
AllowableValue("cluster");
public static final AllowableValue REPORT_NODE_ALL = new
AllowableValue("all");
@@ -113,6 +118,14 @@ public class MonitorActivity extends AbstractProcessor {
.allowableValues("true", "false")
.defaultValue("false")
.build();
+ public static final PropertyDescriptor RESET_STATE_ON_RESTART = new
PropertyDescriptor.Builder()
+ .name("Reset State on Restart")
+ .description("When the processor gets started or restarted, if set
to true, the initial state will always be active. "
+ + "Otherwise, the last reported flow state will be
preserved.")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .build();
public static final PropertyDescriptor INACTIVITY_MESSAGE = new
PropertyDescriptor.Builder()
.name("Inactivity Message")
.description("The message that will be the content of FlowFiles
that are sent to the 'inactive' relationship")
@@ -124,7 +137,7 @@ public class MonitorActivity extends AbstractProcessor {
public static final PropertyDescriptor COPY_ATTRIBUTES = new
PropertyDescriptor.Builder()
.name("Copy Attributes")
.description("If true, will copy all flow file attributes from the
flow file that resumed activity to the newly created indicator flow file")
- .required(false)
+ .required(true)
.allowableValues("true", "false")
.defaultValue("false")
.build();
@@ -148,11 +161,7 @@ public class MonitorActivity extends AbstractProcessor {
" even if it's 'primary', NiFi act as 'all'.")
.required(true)
.allowableValues(REPORT_NODE_ALL, REPORT_NODE_PRIMARY)
- .addValidator(((subject, input, context) -> {
- boolean invalid = REPORT_NODE_PRIMARY.equals(input) &&
SCOPE_NODE.equals(context.getProperty(MONITORING_SCOPE).getValue());
- return new
ValidationResult.Builder().subject(subject).input(input)
- .explanation("'" + REPORT_NODE_PRIMARY + "' is only
available with '" + SCOPE_CLUSTER + "' scope.").valid(!invalid).build();
- }))
+ .dependsOn(MONITORING_SCOPE, SCOPE_CLUSTER)
.defaultValue(REPORT_NODE_ALL.getValue())
.build();
@@ -170,18 +179,16 @@ public class MonitorActivity extends AbstractProcessor {
.description("This relationship is used to transfer an Activity
Restored indicator when FlowFiles are routing to 'success' following a "
+ "period of inactivity")
.build();
- public static final Charset UTF8 = Charset.forName("UTF-8");
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
- private final AtomicLong latestSuccessTransfer = new
AtomicLong(System.currentTimeMillis());
- private final AtomicLong latestReportedNodeState = new
AtomicLong(System.currentTimeMillis());
- private final AtomicBoolean inactive = new AtomicBoolean(false);
- private final AtomicBoolean hasSuccessTransfer = new AtomicBoolean(false);
private final AtomicBoolean connectedWhenLastTriggered = new
AtomicBoolean(false);
- private final AtomicLong lastInactiveMessage = new
AtomicLong(System.currentTimeMillis());
- public static final String STATE_KEY_LATEST_SUCCESS_TRANSFER =
"MonitorActivity.latestSuccessTransfer";
+ private final AtomicLong lastInactiveMessage = new AtomicLong();
+ private final AtomicLong inactivityStartMillis = new
AtomicLong(System.currentTimeMillis());
+ private final AtomicBoolean wasActive = new AtomicBoolean(true);
+
+ private volatile LocalFlowActivityInfo localFlowActivityInfo;
@Override
protected void init(final ProcessorInitializationContext context) {
@@ -191,6 +198,7 @@ public class MonitorActivity extends AbstractProcessor {
properties.add(INACTIVITY_MESSAGE);
properties.add(ACTIVITY_RESTORED_MESSAGE);
properties.add(WAIT_FOR_ACTIVITY);
+ properties.add(RESET_STATE_ON_RESTART);
properties.add(COPY_ATTRIBUTES);
properties.add(MONITORING_SCOPE);
properties.add(REPORTING_NODE);
@@ -217,224 +225,214 @@ public class MonitorActivity extends AbstractProcessor {
public void onScheduled(final ProcessContext context) {
// Check configuration.
isClusterScope(context, true);
- resetLastSuccessfulTransfer();
- inactive.set(false);
- hasSuccessTransfer.set(false);
- }
-
- protected void resetLastSuccessfulTransfer() {
- setLastSuccessfulTransfer(System.currentTimeMillis());
- }
-
- protected final void setLastSuccessfulTransfer(final long timestamp) {
- latestSuccessTransfer.set(timestamp);
- latestReportedNodeState.set(timestamp);
- }
-
- protected final long getLatestSuccessTransfer() {
- return latestSuccessTransfer.get();
- }
-
- private boolean isClusterScope(final ProcessContext context, boolean
logInvalidConfig) {
- if
(SCOPE_CLUSTER.equals(context.getProperty(MONITORING_SCOPE).getValue())) {
- if (getNodeTypeProvider().isConfiguredForClustering()) {
- return true;
- }
- if (logInvalidConfig) {
- getLogger().warn("NiFi is running as a Standalone mode, but
'cluster' scope is set." +
- " Fallback to 'node' scope. Fix configuration to stop
this message.");
- }
+ final long thresholdMillis =
context.getProperty(THRESHOLD).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+ final boolean copyAttributes =
context.getProperty(COPY_ATTRIBUTES).asBoolean();
+ final boolean resetStateOnRestart =
context.getProperty(RESET_STATE_ON_RESTART).asBoolean();
+
+ // Attempt to load last state by the time of stopping this processor.
A local state only exists if
+ // the monitored flow was already inactive, when the processor was
shutting down.
+ final String storedLastSuccessfulTransfer = resetStateOnRestart ? null
: tryLoadLastSuccessfulTransfer(context);
+
+ if (storedLastSuccessfulTransfer != null) {
+ // Initialize local flow as being inactive since the stored
timestamp.
+ localFlowActivityInfo = new LocalFlowActivityInfo(
+ getStartupTime(), thresholdMillis, copyAttributes,
Long.parseLong(storedLastSuccessfulTransfer));
+ wasActive.set(localFlowActivityInfo.isActive());
+ inactivityStartMillis.set(localFlowActivityInfo.getLastActivity());
+ } else {
+ // Initialize local flow as being active. If there is no traffic,
then it will eventually become inactive.
+ localFlowActivityInfo = new LocalFlowActivityInfo(
+ getStartupTime(), thresholdMillis, copyAttributes);
+ wasActive.set(true);
}
- return false;
}
- private boolean shouldReportOnlyOnPrimary(boolean isClusterScope, final
ProcessContext context) {
- if
(REPORT_NODE_PRIMARY.equals(context.getProperty(REPORTING_NODE).getValue())) {
- if (isClusterScope) {
- return true;
+ @OnStopped
+ public void onStopped(final ProcessContext context) {
+ if (getNodeTypeProvider().isConfiguredForClustering() &&
context.isConnectedToCluster()) {
+ // Shared state needs to be cleared, in order to avoid getting
inactive markers right after starting the
+ // flow after a weekend stop. In single-node setup, there is no
shared state to be cleared, but the line
+ // below would also wipe out the local state. Hence, the check.
+ final StateManager stateManager = context.getStateManager();
+ try {
+ stateManager.clear(Scope.CLUSTER);
+ } catch (IOException e) {
+ getLogger().error("Failed to clear cluster state" + e, e);
}
}
- return false;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) {
- final long thresholdMillis =
context.getProperty(THRESHOLD).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
- final long now = System.currentTimeMillis();
-
final ComponentLog logger = getLogger();
- final boolean copyAttributes =
context.getProperty(COPY_ATTRIBUTES).asBoolean();
- final boolean waitForActivity =
context.getProperty(WAIT_FOR_ACTIVITY).asBoolean();
+
final boolean isClusterScope = isClusterScope(context, false);
final boolean isConnectedToCluster = context.isConnectedToCluster();
- final boolean shouldReportOnlyOnPrimary =
shouldReportOnlyOnPrimary(isClusterScope, context);
+ final boolean wasActive = this.wasActive.get();
+
final List<FlowFile> flowFiles = session.get(50);
- if (isClusterScope(context, true)) {
- if (isReconnectedToCluster(isConnectedToCluster)) {
- reconcileState(context);
- connectedWhenLastTriggered.set(true);
- } else if (!isConnectedToCluster) {
- connectedWhenLastTriggered.set(false);
+ if (!flowFiles.isEmpty()) {
+ final boolean firstKnownTransfer =
!localFlowActivityInfo.hasSuccessfulTransfer();
+ final boolean flowStateMustBecomeActive = !wasActive ||
firstKnownTransfer;
+
+ localFlowActivityInfo.update(flowFiles.get(0));
+
+ if (isClusterScope && flowStateMustBecomeActive) {
+ localFlowActivityInfo.forceSync();
}
+
+ session.transfer(flowFiles, REL_SUCCESS);
+ logger.info("Transferred {} FlowFiles to 'success'",
flowFiles.size());
+ } else {
+ context.yield();
}
- boolean isInactive = false;
- long updatedLatestSuccessTransfer = -1;
- StateMap clusterState = null;
-
- if (flowFiles.isEmpty()) {
- final long previousSuccessMillis = latestSuccessTransfer.get();
-
- boolean sendInactiveMarker = false;
-
- isInactive = (now >= previousSuccessMillis + thresholdMillis);
- logger.debug("isInactive={}, previousSuccessMillis={}, now={}",
new Object[]{isInactive, previousSuccessMillis, now});
- if (isInactive && isClusterScope && isConnectedToCluster) {
- // Even if this node has been inactive, there may be other
nodes handling flow actively.
- // However, if this node is active, we don't have to look at
cluster state.
- try {
- clusterState = session.getState(Scope.CLUSTER);
- if (clusterState != null &&
!StringUtils.isEmpty(clusterState.get(STATE_KEY_LATEST_SUCCESS_TRANSFER))) {
- final long latestReportedClusterActivity =
Long.valueOf(clusterState.get(STATE_KEY_LATEST_SUCCESS_TRANSFER));
- isInactive = (now >= latestReportedClusterActivity +
thresholdMillis);
- if (!isInactive) {
- // This node has been inactive, but other node has
more recent activity.
- updatedLatestSuccessTransfer =
latestReportedClusterActivity;
- }
- logger.debug("isInactive={},
latestReportedClusterActivity={}", new Object[]{isInactive,
latestReportedClusterActivity});
- }
- } catch (IOException e) {
- logger.error("Failed to access cluster state. Activity
will not be monitored properly until this is addressed.", e);
- }
+ if (isClusterScope) {
+ if (!wasActive || !localFlowActivityInfo.isActive()) {
+ localFlowActivityInfo.forceSync();
}
+ synchronizeState(context);
+ }
- if (isInactive) {
- final boolean continual =
context.getProperty(CONTINUALLY_SEND_MESSAGES).asBoolean();
- sendInactiveMarker = !inactive.getAndSet(true) || (continual
&& (now > lastInactiveMessage.get() + thresholdMillis));
- if (waitForActivity) {
- sendInactiveMarker = sendInactiveMarker &&
hasSuccessTransfer.get();
- }
+ final long thresholdMillis =
context.getProperty(THRESHOLD).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+ final boolean continuallySendMessages =
context.getProperty(CONTINUALLY_SEND_MESSAGES).asBoolean();
+ final boolean waitForActivity =
context.getProperty(WAIT_FOR_ACTIVITY).asBoolean();
+
+ final boolean isActive = localFlowActivityInfo.isActive() ||
!flowFiles.isEmpty();
+ final long lastActivity = localFlowActivityInfo.getLastActivity();
+ final long inactivityStartMillis = this.inactivityStartMillis.get();
+ final boolean timeToRepeatInactiveMessage = (lastInactiveMessage.get()
+ thresholdMillis) <= System.currentTimeMillis();
+
+ final boolean canReport = !isClusterScope || isConnectedToCluster ||
!flowFiles.isEmpty();
+ final boolean canChangeState = !waitForActivity ||
localFlowActivityInfo.hasSuccessfulTransfer();
+
+ if (canReport && canChangeState) {
+ if (isActive) {
+ onTriggerActiveFlow(context, session, wasActive,
isClusterScope, inactivityStartMillis);
+ } else if (wasActive || continuallySendMessages &&
timeToRepeatInactiveMessage) {
+ onTriggerInactiveFlow(context, session, isClusterScope,
lastActivity);
}
+ this.wasActive.set(isActive);
+ this.inactivityStartMillis.set(lastActivity);
+ } else {
+ // We need to block state transition, because we are not connected
to the cluster.
+ // When we reconnect, and the state persists, then the next
onTrigger will do the transition.
+ logger.trace("State transition is blocked, because we are not
connected to the cluster.");
+ }
+ }
- if (sendInactiveMarker && shouldThisNodeReport(isClusterScope,
shouldReportOnlyOnPrimary, context)) {
- lastInactiveMessage.set(System.currentTimeMillis());
+ protected long getStartupTime() {
+ return System.currentTimeMillis();
+ }
- FlowFile inactiveFlowFile = session.create();
- inactiveFlowFile = session.putAttribute(inactiveFlowFile,
"inactivityStartMillis", String.valueOf(previousSuccessMillis));
- inactiveFlowFile = session.putAttribute(inactiveFlowFile,
"inactivityDurationMillis", String.valueOf(now - previousSuccessMillis));
+ protected final long getLastSuccessfulTransfer() {
+ return localFlowActivityInfo.getLastSuccessfulTransfer();
+ }
- final byte[] outBytes =
context.getProperty(INACTIVITY_MESSAGE).evaluateAttributeExpressions(inactiveFlowFile).getValue().getBytes(UTF8);
- inactiveFlowFile = session.write(inactiveFlowFile, new
OutputStreamCallback() {
- @Override
- public void process(final OutputStream out) throws
IOException {
- out.write(outBytes);
- }
- });
+ private String tryLoadLastSuccessfulTransfer(ProcessContext context) {
+ final StateManager stateManager = context.getStateManager();
+ try {
+ final StateMap localStateMap = stateManager.getState(Scope.LOCAL);
+ return localStateMap.get(STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO);
+ } catch (IOException e) {
+ throw new ProcessException("Failed to load local state due to " +
e, e);
+ }
+ }
- session.getProvenanceReporter().create(inactiveFlowFile);
- session.transfer(inactiveFlowFile, REL_INACTIVE);
- logger.info("Transferred {} to 'inactive'", new
Object[]{inactiveFlowFile});
- } else {
- context.yield(); // no need to dominate CPU checking times;
let other processors run for a bit.
+ private void synchronizeState(ProcessContext context) {
+ final ComponentLog logger = getLogger();
+ final boolean isConnectedToCluster = context.isConnectedToCluster();
+
+ if (isReconnectedToCluster(isConnectedToCluster)) {
+ localFlowActivityInfo.forceSync();
+ connectedWhenLastTriggered.set(true);
+ }
+ if (!isConnectedToCluster) {
+ connectedWhenLastTriggered.set(false);
+ } else if (localFlowActivityInfo.syncNeeded()) {
+ final CommonFlowActivityInfo commonFlowActivityInfo = new
CommonFlowActivityInfo(context);
+ localFlowActivityInfo.update(commonFlowActivityInfo);
+
+ try {
+ commonFlowActivityInfo.update(localFlowActivityInfo);
+ localFlowActivityInfo.setNextSyncMillis();
+ } catch (final SaveSharedFlowStateException ex) {
+ logger.debug("Failed to update common state.", ex);
}
+ }
+ }
- } else {
- session.transfer(flowFiles, REL_SUCCESS);
- hasSuccessTransfer.set(true);
- updatedLatestSuccessTransfer = now;
- logger.info("Transferred {} FlowFiles to 'success'", new
Object[]{flowFiles.size()});
-
- final long latestStateReportTimestamp =
latestReportedNodeState.get();
- if (isClusterScope
- && isConnectedToCluster
- && (now - latestStateReportTimestamp) > (thresholdMillis /
3)) {
- // We don't want to hit the state manager every onTrigger(),
but often enough to detect activeness.
- try {
- final StateMap state = session.getState(Scope.CLUSTER);
-
- final Map<String, String> newValues = new HashMap<>();
-
- // Persist attributes so that other nodes can copy it
- if (copyAttributes) {
- newValues.putAll(flowFiles.get(0).getAttributes());
- }
- newValues.put(STATE_KEY_LATEST_SUCCESS_TRANSFER,
String.valueOf(now));
-
- if (state == null || state.getVersion() == -1) {
- session.setState(newValues, Scope.CLUSTER);
- } else {
- final String existingTimestamp =
state.get(STATE_KEY_LATEST_SUCCESS_TRANSFER);
- if (StringUtils.isEmpty(existingTimestamp)
- || Long.parseLong(existingTimestamp) < now) {
- // If this returns false due to race condition,
it's not a problem since we just need
- // the latest active timestamp.
- session.replaceState(state, newValues,
Scope.CLUSTER);
- } else {
- logger.debug("Existing state has more recent
timestamp, didn't update state.");
- }
- }
- latestReportedNodeState.set(now);
- } catch (IOException e) {
- logger.error("Failed to access cluster state. Activity
will not be monitored properly until this is addressed.", e);
- }
+ private void onTriggerInactiveFlow(ProcessContext context, ProcessSession
session, boolean isClusterScope, long lastActivity) {
+ final ComponentLog logger = getLogger();
+ final boolean shouldThisNodeReport =
shouldThisNodeReport(isClusterScope, context);
+
+ if (shouldThisNodeReport) {
+ sendInactivityMarker(context, session, lastActivity, logger);
+ }
+ lastInactiveMessage.set(System.currentTimeMillis());
+ setInactivityFlag(context.getStateManager());
+ }
+
+ private void onTriggerActiveFlow(ProcessContext context, ProcessSession
session, boolean wasActive, boolean isClusterScope,
+ long inactivityStartMillis) {
+ final ComponentLog logger = getLogger();
+ final boolean shouldThisNodeReport =
shouldThisNodeReport(isClusterScope, context);
+
+ if (!wasActive) {
+ if (shouldThisNodeReport) {
+ final Map<String, String> attributes =
localFlowActivityInfo.getLastSuccessfulTransferAttributes();
+ sendActivationMarker(context, session, attributes,
inactivityStartMillis, logger);
}
+ clearInactivityFlag(context.getStateManager());
}
+ }
- if (!isInactive) {
- final long inactivityStartMillis = latestSuccessTransfer.get();
- if (updatedLatestSuccessTransfer > -1) {
- latestSuccessTransfer.set(updatedLatestSuccessTransfer);
+ private void setInactivityFlag(StateManager stateManager) {
+ try {
+ stateManager.setState(singletonMap(
+ STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO,
+ String.valueOf(localFlowActivityInfo.getLastActivity())
+ ), Scope.LOCAL);
+ } catch (IOException e) {
+ getLogger().error("Failed to set local state due to " + e, e);
+ }
+ }
+
+ private void clearInactivityFlag(StateManager stateManager) {
+ try {
+ stateManager.clear(Scope.LOCAL);
+ } catch (IOException e) {
+ throw new ProcessException("Failed to clear local state due to " +
e, e);
+ }
+ }
+
+ private boolean isClusterScope(final ProcessContext context, boolean
logInvalidConfig) {
+ if
(SCOPE_CLUSTER.getValue().equals(context.getProperty(MONITORING_SCOPE).getValue()))
{
+ if (getNodeTypeProvider().isConfiguredForClustering()) {
+ return true;
}
- if (inactive.getAndSet(false) &&
shouldThisNodeReport(isClusterScope, shouldReportOnlyOnPrimary, context)) {
- FlowFile activityRestoredFlowFile = session.create();
-
- if (copyAttributes) {
-
- final Map<String, String> attributes = new HashMap<>();
- if (flowFiles.size() > 0) {
- // copy attributes from the first flow file in the list
- attributes.putAll(flowFiles.get(0).getAttributes());
- } else if (clusterState != null) {
- attributes.putAll(clusterState.toMap());
- attributes.remove(STATE_KEY_LATEST_SUCCESS_TRANSFER);
- }
- // don't copy the UUID
- attributes.remove(CoreAttributes.UUID.key());
- activityRestoredFlowFile =
session.putAllAttributes(activityRestoredFlowFile, attributes);
- }
-
- activityRestoredFlowFile =
session.putAttribute(activityRestoredFlowFile, "inactivityStartMillis",
String.valueOf(inactivityStartMillis));
- activityRestoredFlowFile =
session.putAttribute(activityRestoredFlowFile, "inactivityDurationMillis",
String.valueOf(now - inactivityStartMillis));
-
- final byte[] outBytes =
context.getProperty(ACTIVITY_RESTORED_MESSAGE).evaluateAttributeExpressions(activityRestoredFlowFile).getValue().getBytes(UTF8);
- activityRestoredFlowFile =
session.write(activityRestoredFlowFile, out -> out.write(outBytes));
-
-
session.getProvenanceReporter().create(activityRestoredFlowFile);
- session.transfer(activityRestoredFlowFile,
REL_ACTIVITY_RESTORED);
- logger.info("Transferred {} to 'activity.restored'", new
Object[]{activityRestoredFlowFile});
+ if (logInvalidConfig) {
+ getLogger().warn("NiFi is running as a Standalone mode, but
'cluster' scope is set." +
+ " Fallback to 'node' scope. Fix configuration to stop
this message.");
}
}
+ return false;
}
- @OnStopped
- public void onStopped(final ProcessContext context) {
- if (getNodeTypeProvider().isPrimary()) {
- final StateManager stateManager = context.getStateManager();
- try {
- stateManager.clear(Scope.CLUSTER);
- } catch (IOException e) {
- getLogger().error("Failed to clear cluster state due to " + e,
e);
- }
+ private boolean shouldReportOnlyOnPrimary(boolean isClusterScope, final
ProcessContext context) {
+ if
(REPORT_NODE_PRIMARY.getValue().equals(context.getProperty(REPORTING_NODE).getValue()))
{
+ return isClusterScope;
}
+ return false;
}
/**
* Will return true when the last known state is "not connected" and the
current state is "connected". This might
- * happen when during last @OnTrigger the node was not connected but
currently it is (reconnection); or when the
+ * happen when during last @OnTrigger the node was not connected, but
currently it is (reconnection); or when the
* processor is triggered first time (initial connection).
- *
+ * <br />
* This second case is due to safety reasons: it is possible that during
the first trigger the node is not connected
* to the cluster thus the default value of the #connected attribute is
false and stays as false until it's proven
* otherwise.
@@ -447,25 +445,203 @@ public class MonitorActivity extends AbstractProcessor {
return !connectedWhenLastTriggered.get() && isConnectedToCluster;
}
- private void reconcileState(final ProcessContext context) {
- try {
- final StateMap state =
context.getStateManager().getState(Scope.CLUSTER);
- final Map<String, String> newState = new HashMap<>();
- newState.putAll(state.toMap());
+ private boolean shouldThisNodeReport(final boolean isClusterScope, final
ProcessContext context) {
+ final boolean shouldReportOnlyOnPrimary =
shouldReportOnlyOnPrimary(isClusterScope, context);
+ return !isClusterScope || (!shouldReportOnlyOnPrimary ||
getNodeTypeProvider().isPrimary());
+ }
- final long validLastSuccessTransfer =
StringUtils.isEmpty(state.get(STATE_KEY_LATEST_SUCCESS_TRANSFER))
- ? latestSuccessTransfer.get()
- :
Math.max(Long.valueOf(state.get(STATE_KEY_LATEST_SUCCESS_TRANSFER)),
latestSuccessTransfer.get());
+ private void sendInactivityMarker(ProcessContext context, ProcessSession
session, long inactivityStartMillis,
+ ComponentLog logger) {
+ FlowFile inactiveFlowFile = session.create();
+ inactiveFlowFile = session.putAttribute(
+ inactiveFlowFile,
+ "inactivityStartMillis", String.valueOf(inactivityStartMillis)
+ );
+ inactiveFlowFile = session.putAttribute(
+ inactiveFlowFile,
+ "inactivityDurationMillis",
+ String.valueOf(System.currentTimeMillis() -
inactivityStartMillis)
+ );
+
+ final byte[] outBytes =
context.getProperty(INACTIVITY_MESSAGE).evaluateAttributeExpressions(inactiveFlowFile).getValue().getBytes(
+ StandardCharsets.UTF_8);
+ inactiveFlowFile = session.write(inactiveFlowFile, out ->
out.write(outBytes));
+
+ session.getProvenanceReporter().create(inactiveFlowFile);
+ session.transfer(inactiveFlowFile, REL_INACTIVE);
+ logger.info("Transferred {} to 'inactive'", inactiveFlowFile);
+ }
- newState.put(STATE_KEY_LATEST_SUCCESS_TRANSFER,
String.valueOf(validLastSuccessTransfer));
- context.getStateManager().replace(state, newState, Scope.CLUSTER);
- } catch (IOException e) {
- getLogger().error("Could not reconcile state after (re)connection!
Reason: " + e.getMessage());
- throw new ProcessException(e);
+ private void sendActivationMarker(ProcessContext context, ProcessSession
session, Map<String, String> attributes,
+ long inactivityStartMillis, ComponentLog logger) {
+ FlowFile activityRestoredFlowFile = session.create();
+ // don't copy the UUID
+ attributes.remove(CoreAttributes.UUID.key());
+ activityRestoredFlowFile =
session.putAllAttributes(activityRestoredFlowFile, attributes);
+
+ activityRestoredFlowFile =
session.putAttribute(activityRestoredFlowFile, "inactivityStartMillis",
String.valueOf(
+ inactivityStartMillis));
+ activityRestoredFlowFile =
session.putAttribute(activityRestoredFlowFile, "inactivityDurationMillis",
String.valueOf(System.currentTimeMillis() - inactivityStartMillis));
+
+ final byte[] outBytes =
context.getProperty(ACTIVITY_RESTORED_MESSAGE).evaluateAttributeExpressions(activityRestoredFlowFile).getValue().getBytes(
+ StandardCharsets.UTF_8);
+ activityRestoredFlowFile = session.write(activityRestoredFlowFile, out
-> out.write(outBytes));
+
+ session.getProvenanceReporter().create(activityRestoredFlowFile);
+ session.transfer(activityRestoredFlowFile, REL_ACTIVITY_RESTORED);
+ logger.info("Transferred {} to 'activity.restored'",
activityRestoredFlowFile);
+ }
+
+ private static class LocalFlowActivityInfo {
+ private static final long NO_VALUE = 0;
+
+ private final long startupTimeMillis;
+ private final long thresholdMillis;
+ private final boolean saveAttributes;
+
+ private long nextSyncMillis = NO_VALUE;
+ private long lastSuccessfulTransfer = NO_VALUE;
+ private Map<String, String> lastSuccessfulTransferAttributes = new
HashMap<>();
+
+ public LocalFlowActivityInfo(long startupTimeMillis, long
thresholdMillis, boolean saveAttributes) {
+ this.startupTimeMillis = startupTimeMillis;
+ this.thresholdMillis = thresholdMillis;
+ this.saveAttributes = saveAttributes;
+ }
+
+ public LocalFlowActivityInfo(long startupTimeMillis, long
thresholdMillis, boolean saveAttributes, long initialLastSuccessfulTransfer) {
+ this(startupTimeMillis, thresholdMillis, saveAttributes);
+ lastSuccessfulTransfer = initialLastSuccessfulTransfer;
+ }
+
+ public boolean syncNeeded() {
+ return nextSyncMillis <= System.currentTimeMillis();
+ }
+
+ public void setNextSyncMillis() {
+ nextSyncMillis = System.currentTimeMillis() + (thresholdMillis /
3);
+ }
+
+ public void forceSync() {
+ nextSyncMillis = System.currentTimeMillis();
+ }
+
+ public boolean isActive() {
+ if (hasSuccessfulTransfer()) {
+ return System.currentTimeMillis() < (lastSuccessfulTransfer +
thresholdMillis);
+ } else {
+ return System.currentTimeMillis() < (startupTimeMillis +
thresholdMillis);
+ }
+ }
+
+ public boolean hasSuccessfulTransfer() {
+ return lastSuccessfulTransfer != NO_VALUE;
+ }
+
+ public long getLastSuccessfulTransfer() {
+ return lastSuccessfulTransfer;
+ }
+
+ public long getLastActivity() {
+ if (hasSuccessfulTransfer()) {
+ return lastSuccessfulTransfer;
+ } else {
+ return startupTimeMillis;
+ }
+ }
+
+ public Map<String, String> getLastSuccessfulTransferAttributes() {
+ return lastSuccessfulTransferAttributes;
+ }
+
+ public void update(FlowFile flowFile) {
+ this.lastSuccessfulTransfer = System.currentTimeMillis();
+ if (saveAttributes) {
+ lastSuccessfulTransferAttributes = new
HashMap<>(flowFile.getAttributes());
+
lastSuccessfulTransferAttributes.remove(CoreAttributes.UUID.key());
+ }
+ }
+
+ public void update(CommonFlowActivityInfo commonFlowActivityInfo) {
+ if (!commonFlowActivityInfo.hasSuccessfulTransfer()) {
+ return;
+ }
+
+ final long lastSuccessfulTransfer =
commonFlowActivityInfo.getLastSuccessfulTransfer();
+
+ if (lastSuccessfulTransfer <= getLastSuccessfulTransfer()) {
+ return;
+ }
+
+ this.lastSuccessfulTransfer = lastSuccessfulTransfer;
+ if (saveAttributes) {
+ lastSuccessfulTransferAttributes =
commonFlowActivityInfo.getLastSuccessfulTransferAttributes();
+ }
}
}
- private boolean shouldThisNodeReport(final boolean isClusterScope, final
boolean isReportOnlyOnPrimary, final ProcessContext context) {
- return !isClusterScope || ((!isReportOnlyOnPrimary ||
getNodeTypeProvider().isPrimary()) && context.isConnectedToCluster());
+ private static class CommonFlowActivityInfo {
+ private final StateManager stateManager;
+ private final StateMap storedState;
+ private final Map<String, String> newState = new HashMap<>();
+
+ public CommonFlowActivityInfo(ProcessContext context) {
+ this.stateManager = context.getStateManager();
+ try {
+ storedState = stateManager.getState(Scope.CLUSTER);
+ } catch (IOException e) {
+ throw new ProcessException("Cannot load common flow activity
info.", e);
+ }
+ }
+
+ public boolean hasSuccessfulTransfer() {
+ return storedState.get(STATE_KEY_COMMON_FLOW_ACTIVITY_INFO) !=
null;
+ }
+
+ public long getLastSuccessfulTransfer() {
+ return
Long.parseLong(storedState.get(STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
+ }
+
+ public Map<String, String> getLastSuccessfulTransferAttributes() {
+ final Map<String, String> result = new
HashMap<>(storedState.toMap());
+ result.remove(STATE_KEY_COMMON_FLOW_ACTIVITY_INFO);
+ return result;
+ }
+
+ public void update(LocalFlowActivityInfo localFlowActivityInfo) {
+ if (!localFlowActivityInfo.hasSuccessfulTransfer()) {
+ return;
+ }
+
+ final long lastSuccessfulTransfer =
localFlowActivityInfo.getLastSuccessfulTransfer();
+
+ if (hasSuccessfulTransfer() && (lastSuccessfulTransfer <=
getLastSuccessfulTransfer())) {
+ return;
+ }
+
+
newState.putAll(localFlowActivityInfo.getLastSuccessfulTransferAttributes());
+ newState.put(STATE_KEY_COMMON_FLOW_ACTIVITY_INFO,
String.valueOf(lastSuccessfulTransfer));
+
+ final boolean wasSuccessful;
+ try {
+ wasSuccessful = stateManager.replace(storedState, newState,
Scope.CLUSTER);
+ } catch (IOException e) {
+ throw new SaveSharedFlowStateException("Caught exception while
saving state.", e);
+ }
+
+ if (!wasSuccessful) {
+ throw new SaveSharedFlowStateException("Failed to save state.
Probably there was a concurrent update.");
+ }
+ }
+ }
+
+ private static class SaveSharedFlowStateException extends ProcessException
{
+ public SaveSharedFlowStateException(String message) {
+ super(message);
+ }
+
+ public SaveSharedFlowStateException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
index a38ca868b0..da5c288005 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
@@ -16,15 +16,23 @@
*/
package org.apache.nifi.processors.standard;
+import static java.lang.System.currentTimeMillis;
+import static java.util.Collections.singletonMap;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
import java.io.IOException;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
-
import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.util.MockFlowFile;
@@ -33,16 +41,10 @@ import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
public class TestMonitorActivity {
@Test
- public void testFirstMessage() {
+ public void testFirstMessage() throws InterruptedException {
final TestableProcessor processor = new TestableProcessor(1000);
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
@@ -53,7 +55,7 @@ public class TestMonitorActivity {
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS, 1);
runner.clearTransferState();
- processor.resetLastSuccessfulTransfer();
+ TimeUnit.MILLISECONDS.sleep(200);
runNext(runner);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE, 1);
@@ -78,7 +80,7 @@ public class TestMonitorActivity {
runner.clearTransferState();
runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "true");
- processor.resetLastSuccessfulTransfer();
+ TimeUnit.MILLISECONDS.sleep(200);
runNext(runner);
runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
@@ -101,19 +103,24 @@ public class TestMonitorActivity {
}
@Test
- public void testFirstMessageWithWaitForActivityTrue() {
+ public void testFirstMessageWithWaitForActivityTrue() throws
InterruptedException {
final TestableProcessor processor = new TestableProcessor(1000);
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
runner.setProperty(MonitorActivity.THRESHOLD, "100 millis");
runner.setProperty(MonitorActivity.WAIT_FOR_ACTIVITY, "true");
+ runner.run(1, false);
+ TimeUnit.MILLISECONDS.sleep(200);
+ runNext(runner);
+ runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
+
runner.enqueue(new byte[0]);
- runner.run();
+ runNext(runner);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS, 1);
runner.clearTransferState();
- processor.resetLastSuccessfulTransfer();
+ TimeUnit.MILLISECONDS.sleep(200);
runNext(runner);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE, 1);
@@ -136,7 +143,7 @@ public class TestMonitorActivity {
runner.clearTransferState();
runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "true");
- processor.resetLastSuccessfulTransfer();
+ TimeUnit.MILLISECONDS.sleep(200);
runNext(runner);
runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
@@ -157,7 +164,7 @@ public class TestMonitorActivity {
}
@Test
public void testReconcileAfterFirstStartWhenLastSuccessIsAlreadySet()
throws Exception {
- final String lastSuccessInCluster =
String.valueOf(System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(5));
+ final String lastSuccessInCluster = String.valueOf(currentTimeMillis()
- TimeUnit.MINUTES.toMillis(5));
final TestRunner runner = TestRunners.newTestRunner(new
TestableProcessor(0));
runner.setIsConfiguredForClustering(true);
runner.setPrimaryNode(true);
@@ -165,14 +172,175 @@ public class TestMonitorActivity {
runner.setProperty(MonitorActivity.REPORTING_NODE,
MonitorActivity.REPORT_NODE_PRIMARY);
runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
runner.setProperty(MonitorActivity.THRESHOLD, "5 secs");
-
runner.getStateManager().setState(Collections.singletonMap(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER,
lastSuccessInCluster), Scope.CLUSTER);
+ runner.getStateManager().setState(
+
singletonMap(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO,
lastSuccessInCluster), Scope.CLUSTER);
runner.enqueue("lorem ipsum");
runner.run(1, false);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
final StateMap updatedState =
runner.getStateManager().getState(Scope.CLUSTER);
- assertNotEquals(lastSuccessInCluster,
updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+ assertNotEquals(lastSuccessInCluster,
updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
+ }
+
+ @Test
+ public void
testReconcileAfterFirstStartWhenLastSuccessIsAlreadySetAndNoInput() throws
Exception {
+ final String lastSuccessInCluster = String.valueOf(currentTimeMillis()
- TimeUnit.MINUTES.toMillis(5));
+ final TestRunner runner = TestRunners.newTestRunner(new
TestableProcessor(0));
+ runner.setIsConfiguredForClustering(true);
+ runner.setPrimaryNode(true);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE,
MonitorActivity.SCOPE_CLUSTER);
+ runner.setProperty(MonitorActivity.REPORTING_NODE,
MonitorActivity.REPORT_NODE_PRIMARY);
+ runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
+ runner.setProperty(MonitorActivity.THRESHOLD, "5 secs");
+ runner.getStateManager().setState(
+
singletonMap(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO,
lastSuccessInCluster), Scope.CLUSTER);
+
+ runner.run(1, false);
+
+ final StateMap updatedState =
runner.getStateManager().getState(Scope.CLUSTER);
+ assertEquals(lastSuccessInCluster,
updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
+
+ runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
+ runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
+ runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
+ }
+
+ @Test
+ public void
testReconcileAfterFirstStartWhenLastSuccessIsAlreadySetAndNoInputButClusterIsActive()
throws Exception {
+ final String lastSuccessInCluster =
String.valueOf(currentTimeMillis());
+ final TestRunner runner = TestRunners.newTestRunner(new
TestableProcessor(0));
+ runner.setIsConfiguredForClustering(true);
+ runner.setPrimaryNode(true);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE,
MonitorActivity.SCOPE_CLUSTER);
+ runner.setProperty(MonitorActivity.REPORTING_NODE,
MonitorActivity.REPORT_NODE_PRIMARY);
+ runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
+ runner.setProperty(MonitorActivity.THRESHOLD, "5 minutes");
+ runner.getStateManager().setState(
+
singletonMap(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO,
lastSuccessInCluster), Scope.CLUSTER);
+
+ runner.run(1, false);
+
+ final StateMap updatedState =
runner.getStateManager().getState(Scope.CLUSTER);
+ assertEquals(lastSuccessInCluster,
updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
+
+ runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
+ runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
+ runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
+ }
+
+ @Test
+ public void
testReconcileAfterFirstStartWhenLastSuccessIsAlreadySetAndNoInputAndWasInactiveLastTime()
throws Exception {
+ final String lastSuccessInCluster = String.valueOf(currentTimeMillis()
- TimeUnit.MINUTES.toMillis(5));
+ final TestRunner runner = TestRunners.newTestRunner(new
TestableProcessor(0));
+ runner.setIsConfiguredForClustering(true);
+ runner.setPrimaryNode(true);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE,
MonitorActivity.SCOPE_CLUSTER);
+ runner.setProperty(MonitorActivity.REPORTING_NODE,
MonitorActivity.REPORT_NODE_PRIMARY);
+ runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
+ runner.setProperty(MonitorActivity.THRESHOLD, "5 secs");
+ runner.getStateManager().setState(
+
singletonMap(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO,
lastSuccessInCluster), Scope.CLUSTER);
+
+ runner.setProperty(MonitorActivity.RESET_STATE_ON_RESTART,
Boolean.FALSE.toString());
+ runner.getStateManager().setState(
+
singletonMap(MonitorActivity.STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO,
lastSuccessInCluster), Scope.LOCAL
+ );
+
+ runner.run(1, false);
+
+ final StateMap updatedState =
runner.getStateManager().getState(Scope.CLUSTER);
+ assertEquals(lastSuccessInCluster,
updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
+ final StateMap updatedLocalState =
runner.getStateManager().getState(Scope.LOCAL);
+ assertEquals(lastSuccessInCluster,
updatedLocalState.get(MonitorActivity.STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO));
+
+ runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
+ runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
+ runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
+ }
+
+ @Test
+ public void
testReconcileAfterFirstStartWhenLastSuccessIsAlreadySetAndNoInputAndWasActiveLastTime()
throws Exception {
+ final String lastSuccessInCluster =
String.valueOf(currentTimeMillis());
+ final TestRunner runner = TestRunners.newTestRunner(new
TestableProcessor(0));
+ runner.setIsConfiguredForClustering(true);
+ runner.setPrimaryNode(true);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE,
MonitorActivity.SCOPE_CLUSTER);
+ runner.setProperty(MonitorActivity.REPORTING_NODE,
MonitorActivity.REPORT_NODE_PRIMARY);
+ runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
+ runner.setProperty(MonitorActivity.THRESHOLD, "5 minutes");
+ runner.getStateManager().setState(
+
singletonMap(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO,
lastSuccessInCluster), Scope.CLUSTER);
+
+ runner.setProperty(MonitorActivity.RESET_STATE_ON_RESTART,
Boolean.FALSE.toString());
+ // if was active, there is no local state
+
+ runner.run(1, false);
+
+ final StateMap updatedState =
runner.getStateManager().getState(Scope.CLUSTER);
+ assertEquals(lastSuccessInCluster,
updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
+
+ runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
+ runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
+ runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
+ }
+
+ @Test
+ public void
testReconcileAfterFirstStartWhenLastSuccessIsAlreadySetAndNoInputAndTurnedInactiveSinceLastTime()
throws Exception {
+ final String lastSuccessInCluster = String.valueOf(currentTimeMillis()
- TimeUnit.MINUTES.toMillis(5));
+ final TestRunner runner = TestRunners.newTestRunner(new
TestableProcessor(0));
+ runner.setIsConfiguredForClustering(true);
+ runner.setPrimaryNode(true);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE,
MonitorActivity.SCOPE_CLUSTER);
+ runner.setProperty(MonitorActivity.REPORTING_NODE,
MonitorActivity.REPORT_NODE_PRIMARY);
+ runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
+ runner.setProperty(MonitorActivity.THRESHOLD, "5 secs");
+ runner.getStateManager().setState(
+
singletonMap(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO,
lastSuccessInCluster), Scope.CLUSTER);
+
+ runner.setProperty(MonitorActivity.RESET_STATE_ON_RESTART,
Boolean.FALSE.toString());
+ // if was active, there is no local state
+
+ runner.run(1, false);
+
+ final StateMap updatedClusterState =
runner.getStateManager().getState(Scope.CLUSTER);
+ assertEquals(lastSuccessInCluster,
updatedClusterState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
+ final StateMap updatedLocalState =
runner.getStateManager().getState(Scope.LOCAL);
+ assertEquals(lastSuccessInCluster,
updatedLocalState.get(MonitorActivity.STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO));
+
+ runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
+ runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
+ runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
+ }
+
+ @Test
+ public void
testReconcileAfterFirstStartWhenLastSuccessIsAlreadySetAndNoInputAndTurnedActiveSinceLastTime()
throws Exception {
+ final String lastSuccessInCluster =
String.valueOf(currentTimeMillis());
+ final String lastSuccessInLocal = String.valueOf(currentTimeMillis() -
TimeUnit.MINUTES.toMillis(5));
+ final TestRunner runner = TestRunners.newTestRunner(new
TestableProcessor(0));
+ runner.setIsConfiguredForClustering(true);
+ runner.setPrimaryNode(true);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE,
MonitorActivity.SCOPE_CLUSTER);
+ runner.setProperty(MonitorActivity.REPORTING_NODE,
MonitorActivity.REPORT_NODE_PRIMARY);
+ runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
+ runner.setProperty(MonitorActivity.THRESHOLD, "5 secs");
+ runner.getStateManager().setState(
+
singletonMap(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO,
lastSuccessInCluster), Scope.CLUSTER);
+
+ runner.setProperty(MonitorActivity.RESET_STATE_ON_RESTART,
Boolean.FALSE.toString());
+ runner.getStateManager().setState(
+
singletonMap(MonitorActivity.STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO,
lastSuccessInLocal), Scope.LOCAL
+ );
+
+ runner.run(1, false);
+
+ final StateMap updatedState =
runner.getStateManager().getState(Scope.CLUSTER);
+ assertEquals(lastSuccessInCluster,
updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
+
assertNull(runner.getStateManager().getState(Scope.LOCAL).get(MonitorActivity.STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO));
+
+ runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
+ runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
+ runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
}
@Test
@@ -188,33 +356,41 @@ public class TestMonitorActivity {
runner.setConnected(false);
runner.enqueue("lorem ipsum");
- runner.run(1, false, false);
+ runner.run(1, false);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
runner.setConnected(true);
- runner.run(1, false, false);
+ runNext(runner);
- final long tLocal = processor.getLatestSuccessTransfer();
+ final long tLocal = processor.getLastSuccessfulTransfer();
final long tCluster = getLastSuccessFromCluster(runner);
assertEquals(tLocal, tCluster);
}
@Test
- public void testReconcileAfterReconnectWhenPrimary() throws
InterruptedException {
+ public void testReconcileAfterReconnectWhenPrimary() throws
InterruptedException, IOException {
final TestRunner runner = getRunnerScopeCluster(new MonitorActivity(),
true);
+ final StateManager stateManager = runner.getStateManager();
// First trigger will write last success transfer into cluster.
runner.enqueue("lorem ipsum");
- runNext(runner);
+ runner.run(1, false);
+
+ final String lastSuccessTransferAfterFirstTrigger =
stateManager.getState(Scope.CLUSTER)
+ .get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO);
assertTransferCountSuccessInactiveRestored(runner, 1, 0);
// At second trigger it's not connected, new last success transfer
stored only locally.
runner.setConnected(false);
runner.enqueue("lorem ipsum");
+ TimeUnit.MILLISECONDS.sleep(500); // This sleep is needed to
guarantee, that the stored timestamp will be different.
runNext(runner);
+ assertEquals(lastSuccessTransferAfterFirstTrigger,
+
stateManager.getState(Scope.CLUSTER).get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
+
assertTransferCountSuccessInactiveRestored(runner, 2, 0);
// The third trigger is without flow file, but reconcile is triggered
and value is written ot cluster.
@@ -222,32 +398,46 @@ public class TestMonitorActivity {
TimeUnit.MILLISECONDS.sleep(500);
runNext(runner);
+ assertNotEquals(lastSuccessTransferAfterFirstTrigger,
+
stateManager.getState(Scope.CLUSTER).get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
+
// Inactive message is being sent after the connection is back.
assertTransferCountSuccessInactiveRestored(runner,2, 1);
}
@Test
- public void testReconcileAfterReconnectWhenNotPrimary() {
+ public void testReconcileAfterReconnectWhenNotPrimary() throws
IOException, InterruptedException {
final TestableProcessor processor = new TestableProcessor(1000);
final TestRunner runner = getRunnerScopeCluster(processor, false);
+ final StateManager stateManager = runner.getStateManager();
// First trigger will write last success transfer into cluster.
runner.enqueue("lorem ipsum");
- runNext(runner);
+ runner.run(1, false);
+
+ final String lastSuccessTransferAfterFirstTrigger =
stateManager.getState(Scope.CLUSTER)
+ .get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO);
assertTransferCountSuccessInactiveRestored(runner, 1, 0);
// At second trigger it's not connected, new last success transfer
stored only locally.
runner.setConnected(false);
runner.enqueue("lorem ipsum");
+ TimeUnit.MILLISECONDS.sleep(500); // This sleep is needed to
guarantee, that the stored timestamp will be different.
runNext(runner);
+ assertEquals(lastSuccessTransferAfterFirstTrigger,
+
stateManager.getState(Scope.CLUSTER).get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
+
assertTransferCountSuccessInactiveRestored(runner, 2, 0);
// The third trigger is without flow file, but reconcile is triggered
and value is written ot cluster.
runner.setConnected(true);
runNext(runner);
+ assertNotEquals(lastSuccessTransferAfterFirstTrigger,
+
stateManager.getState(Scope.CLUSTER).get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
+
// No inactive message because of the node is not primary
assertTransferCountSuccessInactiveRestored(runner, 2, 0);
}
@@ -269,7 +459,7 @@ public class TestMonitorActivity {
}
private Long getLastSuccessFromCluster(final TestRunner runner) throws
IOException {
- return
Long.valueOf(runner.getStateManager().getState(Scope.CLUSTER).get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+ return
Long.valueOf(runner.getStateManager().getState(Scope.CLUSTER).get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
}
private void assertTransferCountSuccessInactiveRestored(TestRunner runner,
final int success, final int inactive) {
@@ -303,7 +493,7 @@ public class TestMonitorActivity {
}
@Test
- public void testFirstMessageWithInherit() {
+ public void testFirstMessageWithInherit() throws InterruptedException {
final TestableProcessor processor = new TestableProcessor(1000);
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
@@ -311,12 +501,12 @@ public class TestMonitorActivity {
runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
runner.enqueue(new byte[0]);
- runner.run();
+ runner.run(1, false);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS, 1);
MockFlowFile originalFlowFile =
runner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS).get(0);
runner.clearTransferState();
- processor.resetLastSuccessfulTransfer();
+ TimeUnit.MILLISECONDS.sleep(200);
runNext(runner);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE, 1);
@@ -346,7 +536,7 @@ public class TestMonitorActivity {
runner.clearTransferState();
runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "true");
- processor.resetLastSuccessfulTransfer();
+ TimeUnit.MILLISECONDS.sleep(200);
runNext(runner);
runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
@@ -383,7 +573,7 @@ public class TestMonitorActivity {
rerun = false;
runner.setProperty(MonitorActivity.THRESHOLD, threshold + "
millis");
- Thread.sleep(1000L);
+ TimeUnit.MILLISECONDS.sleep(1000L);
// shouldn't generate inactivity b/c run() will reset the
lastSuccessfulTransfer if @OnSchedule & onTrigger
// does not get called more than MonitorActivity.THRESHOLD apart
@@ -410,30 +600,18 @@ public class TestMonitorActivity {
*/
private static class TestableProcessor extends MonitorActivity {
- private final long timestampDifference;
+ private final long startupTime;
public TestableProcessor(final long timestampDifference) {
- this.timestampDifference = timestampDifference;
+ this.startupTime = currentTimeMillis() - timestampDifference;
}
@Override
- public void resetLastSuccessfulTransfer() {
- setLastSuccessfulTransfer(System.currentTimeMillis() -
timestampDifference);
+ protected long getStartupTime() {
+ return startupTime;
}
}
- @Test
- public void testClusterMonitorInvalidReportingNode() {
- final TestRunner runner = TestRunners.newTestRunner(new
TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
-
- runner.setIsConfiguredForClustering(true);
- runner.setPrimaryNode(false);
- runner.setProperty(MonitorActivity.MONITORING_SCOPE,
MonitorActivity.SCOPE_NODE);
- runner.setProperty(MonitorActivity.REPORTING_NODE,
MonitorActivity.REPORT_NODE_PRIMARY);
-
- runner.assertNotValid();
- }
-
@Test
public void testClusterMonitorActive() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new
TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
@@ -445,12 +623,12 @@ public class TestMonitorActivity {
runner.enqueue("Incoming data");
- runner.run();
+ runner.run(1, false);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
final StateMap updatedState =
runner.getStateManager().getState(Scope.CLUSTER);
-
assertNotNull(updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+
assertNotNull(updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
// Should be null because COPY_ATTRIBUTES is null.
assertNull(updatedState.get("key1"));
assertNull(updatedState.get("key2"));
@@ -472,7 +650,7 @@ public class TestMonitorActivity {
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
final StateMap updatedState =
runner.getStateManager().getState(Scope.CLUSTER);
-
assertNull(updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+
assertNull(updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
}
@Test
@@ -488,21 +666,22 @@ public class TestMonitorActivity {
// Set future timestamp in state
final HashMap<String, String> existingState = new HashMap<>();
- final long existingTimestamp = System.currentTimeMillis() - 1_000;
- existingState.put(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER,
+ final long existingTimestamp = currentTimeMillis() - 1_000;
+ existingState.put(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO,
String.valueOf(existingTimestamp));
existingState.put("key1", "value1");
existingState.put("key2", "value2");
runner.getStateManager().setState(existingState, Scope.CLUSTER);
runner.getStateManager().replace(runner.getStateManager().getState(Scope.CLUSTER),
existingState, Scope.CLUSTER);
- runner.run();
+ runner.run(1, false);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
final StateMap postProcessedState =
runner.getStateManager().getState(Scope.CLUSTER);
- assertTrue( existingTimestamp <
Long.parseLong(postProcessedState.get(
- MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER)));
+ long postProcessedTimestamp = Long.parseLong(postProcessedState.get(
+ MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
+ assertTrue(existingTimestamp < postProcessedTimestamp);
// State should be updated. Null in this case.
assertNull(postProcessedState.get("key1"));
assertNull(postProcessedState.get("key2"));
@@ -521,22 +700,22 @@ public class TestMonitorActivity {
// Set future timestamp in state
final HashMap<String, String> existingState = new HashMap<>();
- final long existingTimestamp = System.currentTimeMillis() + 10_000;
- existingState.put(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER,
+ final long existingTimestamp = currentTimeMillis() + 10_000;
+ existingState.put(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO,
String.valueOf(existingTimestamp));
existingState.put("key1", "value1");
existingState.put("key2", "value2");
runner.getStateManager().setState(existingState, Scope.CLUSTER);
runner.getStateManager().replace(runner.getStateManager().getState(Scope.CLUSTER),
existingState, Scope.CLUSTER);
- runner.run();
+ runner.run(1, false);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
final StateMap postProcessedState =
runner.getStateManager().getState(Scope.CLUSTER);
assertEquals(
String.valueOf(existingTimestamp),
-
postProcessedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+
postProcessedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
// State should stay the same.
assertEquals(postProcessedState.get("key1"),
existingState.get("key1"));
assertEquals(postProcessedState.get("key2"),
existingState.get("key2"));
@@ -557,12 +736,12 @@ public class TestMonitorActivity {
attributes.put("key2", "value2");
runner.enqueue("Incoming data", attributes);
- runner.run();
+ runner.run(1, false);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
final StateMap updatedState =
runner.getStateManager().getState(Scope.CLUSTER);
-
assertNotNull(updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+
assertNotNull(updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
assertEquals("value1", updatedState.get("key1"));
assertEquals("value2", updatedState.get("key2"));
}
@@ -689,7 +868,7 @@ public class TestMonitorActivity {
// Latest activity should be persisted
final StateMap updatedState =
runner.getStateManager().getState(Scope.CLUSTER);
-
assertNotNull(updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+
assertNotNull(updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
assertEquals("value1", updatedState.get("key1"));
assertEquals("value2", updatedState.get("key2"));
runner.clearTransferState();
@@ -723,7 +902,7 @@ public class TestMonitorActivity {
// Latest activity should be persisted
final StateMap updatedState =
runner.getStateManager().getState(Scope.CLUSTER);
-
assertNotNull(updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+
assertNotNull(updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
assertEquals("value1", updatedState.get("key1"));
assertEquals("value2", updatedState.get("key2"));
runner.clearTransferState();
@@ -762,7 +941,7 @@ public class TestMonitorActivity {
// Latest activity should be persisted
final StateMap updatedState =
runner.getStateManager().getState(Scope.CLUSTER);
-
assertNotNull(updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+
assertNotNull(updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
assertEquals("value1", updatedState.get("key1"));
assertEquals("value2", updatedState.get("key2"));
runner.clearTransferState();
@@ -801,7 +980,7 @@ public class TestMonitorActivity {
// Latest activity should NOT be persisted
final StateMap updatedState =
runner.getStateManager().getState(Scope.CLUSTER);
-
assertNull(updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+
assertNull(updatedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
runner.clearTransferState();
}
@@ -812,22 +991,25 @@ public class TestMonitorActivity {
runner.setIsConfiguredForClustering(true);
runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE,
MonitorActivity.SCOPE_CLUSTER);
- runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
+ runner.setProperty(MonitorActivity.THRESHOLD, "10 sec");
runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
// Becomes inactive
- runner.run();
+ runner.run(1, false);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
runner.clearTransferState();
// Activity restored, even if this node doesn't have activity, other
node updated the cluster state.
final HashMap<String, String> clusterState = new HashMap<>();
- clusterState.put(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER,
String.valueOf(System.currentTimeMillis()));
+ clusterState.put(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO,
String.valueOf(currentTimeMillis()));
clusterState.put("key1", "value1");
clusterState.put("key2", "value2");
runner.getStateManager().setState(clusterState, Scope.CLUSTER);
runner.getStateManager().replace(runner.getStateManager().getState(Scope.CLUSTER),
clusterState, Scope.CLUSTER);
+ // Common state is not sampled on each trigger. We need to wait a
little to get notified about the update.
+ TimeUnit.MILLISECONDS.sleep(3334); // Sampling rate is threshold/3
+
runNext(runner);
final List<MockFlowFile> successFiles =
runner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS);
final List<MockFlowFile> activityRestoredFiles =
runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED);
@@ -849,22 +1031,25 @@ public class TestMonitorActivity {
runner.setPrimaryNode(true);
runner.setProperty(MonitorActivity.MONITORING_SCOPE,
MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.REPORTING_NODE,
MonitorActivity.REPORT_NODE_PRIMARY);
- runner.setProperty(MonitorActivity.THRESHOLD, "1 hour");
+ runner.setProperty(MonitorActivity.THRESHOLD, "10 sec");
runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
// Becomes inactive
- runner.run();
+ runner.run(1, false);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
runner.clearTransferState();
// Activity restored, even if this node doesn't have activity, other
node updated the cluster state.
final HashMap<String, String> clusterState = new HashMap<>();
- clusterState.put(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER,
String.valueOf(System.currentTimeMillis()));
+ clusterState.put(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO,
String.valueOf(currentTimeMillis()));
clusterState.put("key1", "value1");
clusterState.put("key2", "value2");
runner.getStateManager().setState(clusterState, Scope.CLUSTER);
runner.getStateManager().replace(runner.getStateManager().getState(Scope.CLUSTER),
clusterState, Scope.CLUSTER);
+ // Common state is not sampled on each trigger. We need to wait a
little to get notified about the update.
+ TimeUnit.MILLISECONDS.sleep(3334); // Sampling rate is threshold/3
+
runNext(runner);
final List<MockFlowFile> successFiles =
runner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS);
final List<MockFlowFile> activityRestoredFiles =
runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED);
@@ -892,7 +1077,7 @@ public class TestMonitorActivity {
// Activity restored, even if this node doesn't have activity, other
node updated the cluster state.
final HashMap<String, String> clusterState = new HashMap<>();
- clusterState.put(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER,
String.valueOf(System.currentTimeMillis()));
+ clusterState.put(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO,
String.valueOf(currentTimeMillis()));
clusterState.put("key1", "value1");
clusterState.put("key2", "value2");
runner.getStateManager().setState(clusterState, Scope.CLUSTER);
@@ -906,4 +1091,288 @@ public class TestMonitorActivity {
}
-}
\ No newline at end of file
+ @Test
+ public void testDisconnectedNodeActivatesFlow() {
+ final TestRunner runner = TestRunners.newTestRunner(new
TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
+ runner.setIsConfiguredForClustering(true);
+ runner.setConnected(true);
+ runner.setPrimaryNode(false);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE,
MonitorActivity.SCOPE_CLUSTER);
+ runner.setProperty(MonitorActivity.REPORTING_NODE,
MonitorActivity.REPORT_NODE_ALL);
+ runner.setProperty(MonitorActivity.THRESHOLD, "1 min");
+
+ // Becomes inactive
+ runner.run(1, false);
+ runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
+ runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
+ runner.clearTransferState();
+
+ // Disconnect the node, and feed in a flow file
+ runner.setConnected(false);
+ runner.enqueue("Incoming data");
+ runNext(runner);
+
+ // We expect both activation marker, and forwarded FF
+ runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1);
+ runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
+ }
+
+ @Test
+ public void testDisconnectedNodeDeactivatesFlowOnlyWhenConnected() {
+ final TestRunner runner = TestRunners.newTestRunner(new
TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
+ runner.setIsConfiguredForClustering(true);
+ runner.setConnected(false);
+ runner.setPrimaryNode(false);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE,
MonitorActivity.SCOPE_CLUSTER);
+ runner.setProperty(MonitorActivity.REPORTING_NODE,
MonitorActivity.REPORT_NODE_ALL);
+ runner.setProperty(MonitorActivity.THRESHOLD, "1 min");
+
+ // Becomes inactive
+ runner.run(1, false);
+ runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
+ runner.clearTransferState();
+
+ // Disconnect the node, and expect marker
+ runner.setConnected(true);
+ runNext(runner);
+ runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
+ runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
+ }
+
+ @Test
+ public void testLocalStateIsNotDeletedInStandaloneCaseWhenStopped() throws
IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new
TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
+ runner.setIsConfiguredForClustering(false);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE,
MonitorActivity.SCOPE_NODE);
+ runner.setProperty(MonitorActivity.REPORTING_NODE,
MonitorActivity.REPORT_NODE_ALL);
+ runner.setProperty(MonitorActivity.THRESHOLD, "1 min");
+
+ // Becomes inactive
+ runner.run(1, false);
+ runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
+ runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
+
assertFalse(runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty());
+
+ // Stop the processor and expect the local state still there
+ runner.stop();
+
assertFalse(runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty());
+ }
+
+ @Test
+ public void
testLocalStateIsNotDeletedInClusteredCaseNodeScopeWhenStopped() throws
IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new
TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
+ runner.setIsConfiguredForClustering(true);
+ runner.setConnected(true);
+ runner.setPrimaryNode(true);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE,
MonitorActivity.SCOPE_NODE);
+ runner.setProperty(MonitorActivity.REPORTING_NODE,
MonitorActivity.REPORT_NODE_ALL);
+ runner.setProperty(MonitorActivity.THRESHOLD, "1 min");
+
+ // Becomes inactive
+ runner.run(1, false);
+ runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
+ runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
+
assertFalse(runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty());
+
+ // Stop the processor and expect the local state still there
+ runner.stop();
+
assertFalse(runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty());
+ }
+
+ @Test
+ public void
testLocalStateIsNotDeletedInClusteredCaseClusterScopeWhenStopped() throws
IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new
TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
+ runner.setIsConfiguredForClustering(true);
+ runner.setConnected(true);
+ runner.setPrimaryNode(true);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE,
MonitorActivity.SCOPE_CLUSTER);
+ runner.setProperty(MonitorActivity.REPORTING_NODE,
MonitorActivity.REPORT_NODE_ALL);
+ runner.setProperty(MonitorActivity.THRESHOLD, "1 min");
+
+ // Becomes inactive
+ runner.run(1, false);
+ runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
+ runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
+
assertFalse(runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty());
+
+ // Stop the processor and expect the local state still there
+ runner.stop();
+
assertFalse(runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty());
+ }
+
+ @Test
+ public void
testLocalStateIsNotDeletedInClusteredCaseWhenDisconnectedAndStopped() throws
IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new
TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
+ runner.setIsConfiguredForClustering(true);
+ runner.setConnected(true);
+ runner.setPrimaryNode(true);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE,
MonitorActivity.SCOPE_CLUSTER);
+ runner.setProperty(MonitorActivity.REPORTING_NODE,
MonitorActivity.REPORT_NODE_ALL);
+ runner.setProperty(MonitorActivity.THRESHOLD, "1 min");
+
+ // Becomes inactive
+ runner.run(1, false);
+ runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
+ runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
+
assertFalse(runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty());
+
+ // Disconnect the node & stop the processor and expect the local state
still there
+ runner.setConnected(false);
+ runner.stop();
+
assertFalse(runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty());
+ }
+
+ @Test
+ public void
testActivationMarkerIsImmediateWhenAnyOtherNodeActivatesTheFlow() throws
IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new
TestableProcessor(TimeUnit.DAYS.toMillis(1)));
+ runner.setIsConfiguredForClustering(true);
+ runner.setConnected(true);
+ runner.setPrimaryNode(true);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE,
MonitorActivity.SCOPE_CLUSTER);
+ runner.setProperty(MonitorActivity.REPORTING_NODE,
MonitorActivity.REPORT_NODE_PRIMARY);
+ runner.setProperty(MonitorActivity.THRESHOLD, "3 hours");
+
+ // Becomes inactive
+ runner.run(1, false);
+ runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
+ runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
+ runner.clearTransferState();
+
+ // Update the cluster state
+
runner.getStateManager().setState(singletonMap(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO,
String.valueOf(currentTimeMillis())), Scope.CLUSTER);
+ runNext(runner);
+
+ // We expect activation marker only
+ runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
+
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_ACTIVITY_RESTORED);
+ }
+
+ @Test
+ public void
testDisconnectNodeAndActivateBothTheOtherNodesAndTheDisconnectedNodeIndependently()
throws IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new
TestableProcessor(TimeUnit.DAYS.toMillis(1)));
+ runner.setIsConfiguredForClustering(true);
+ runner.setConnected(true);
+ runner.setPrimaryNode(false);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE,
MonitorActivity.SCOPE_CLUSTER);
+ runner.setProperty(MonitorActivity.REPORTING_NODE,
MonitorActivity.REPORT_NODE_ALL);
+ runner.setProperty(MonitorActivity.THRESHOLD, "3 hours");
+
+ // Becomes inactive
+ runner.run(1, false);
+ runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
+ runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
+ runner.clearTransferState();
+
+ // Disconnect the node, and feed in a flow file to activate it
+ runner.setConnected(false);
+ runner.enqueue("Incoming data");
+ runNext(runner);
+
+ // We expect both activation marker, and forwarded FF
+ runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1);
+ runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
+ runner.clearTransferState();
+
+ // Update the cluster state too, and reconnect. This simulates other
nodes being activated.
+
runner.getStateManager().setState(singletonMap(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO,
String.valueOf(currentTimeMillis())), Scope.CLUSTER);
+ runner.setConnected(true);
+ runNext(runner);
+
+ // We expect no output
+ runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
+ runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
+ runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
+ }
+
+ @Test
+ public void testClusterStateIsImmediatelyUpdatedOnActivation() throws
IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new
TestableProcessor(TimeUnit.DAYS.toMillis(1)));
+ runner.setIsConfiguredForClustering(true);
+ runner.setConnected(true);
+ runner.setPrimaryNode(false);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE,
MonitorActivity.SCOPE_CLUSTER);
+ runner.setProperty(MonitorActivity.REPORTING_NODE,
MonitorActivity.REPORT_NODE_ALL);
+ runner.setProperty(MonitorActivity.THRESHOLD, "3 hours");
+
+ // Becomes inactive
+ runner.run(1, false);
+ runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
+ runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
+ runner.clearTransferState();
+
+
assertNull(runner.getStateManager().getState(Scope.CLUSTER).get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
+ runner.enqueue("Incoming data");
+ runNext(runner);
+
assertNotNull(runner.getStateManager().getState(Scope.CLUSTER).get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
+ }
+
+ @Test
+ public void testResetStateOnStartupByDefault() throws IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new
MonitorActivity());
+ runner.setIsConfiguredForClustering(false);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE,
MonitorActivity.SCOPE_NODE);
+ runner.setProperty(MonitorActivity.THRESHOLD, "24 hours");
+
+ runner.getStateManager().setState(
+ singletonMap(
+ MonitorActivity.STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO,
+ String.valueOf(currentTimeMillis() -
TimeUnit.DAYS.toMillis(1))
+ ),
+ Scope.LOCAL
+ );
+
+ runner.enqueue("Incoming data");
+ runner.run();
+
+ // We expect only the FF as output
+ runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
+ }
+
+ @Test
+ public void testResetStateOnStartupDisabled() throws IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new
MonitorActivity());
+ runner.setIsConfiguredForClustering(false);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE,
MonitorActivity.SCOPE_NODE);
+ runner.setProperty(MonitorActivity.THRESHOLD, "24 hours");
+ runner.setProperty(MonitorActivity.RESET_STATE_ON_RESTART,
Boolean.FALSE.toString());
+
+ runner.getStateManager().setState(
+ singletonMap(
+ MonitorActivity.STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO,
+ String.valueOf(currentTimeMillis() -
TimeUnit.DAYS.toMillis(1))
+ ),
+ Scope.LOCAL
+ );
+
+ runner.enqueue("Incoming data");
+ runner.run();
+
+ // We expect only the FF as output
+ runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1);
+ runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
+ }
+
+ @Test
+ public void
testMultipleFlowFilesActivateTheFlowInSingleTriggerResultsInSingleMarker()
throws IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new
TestableProcessor(TimeUnit.DAYS.toMillis(1)));
+ runner.setIsConfiguredForClustering(false);
+ runner.setProperty(MonitorActivity.MONITORING_SCOPE,
MonitorActivity.SCOPE_NODE);
+ runner.setProperty(MonitorActivity.THRESHOLD, "3 hours");
+
+ // Becomes inactive
+ runner.run(1, false);
+ runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
+ runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
+ runner.clearTransferState();
+
+ // Adding flow files
+ runner.enqueue("Incoming data 1");
+ runner.enqueue("Incoming data 2");
+ runner.enqueue("Incoming data 3");
+ runNext(runner);
+
+ // We expect only the FF as output
+ runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 3);
+ runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
+ }
+}