Repository: nifi Updated Branches: refs/heads/master b17be66a1 -> d421e3c24
NIFI-846 MonitorActivity not setting start of evaluation period correctly Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d421e3c2 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d421e3c2 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d421e3c2 Branch: refs/heads/master Commit: d421e3c2424b224522e9e5a83f014ccda2496739 Parents: b17be66 Author: Bryan Bende <[email protected]> Authored: Fri Aug 21 11:06:20 2015 -0400 Committer: Bryan Bende <[email protected]> Committed: Fri Aug 21 11:06:20 2015 -0400 ---------------------------------------------------------------------- .../processors/standard/MonitorActivity.java | 42 ++++++++++++-------- .../standard/TestMonitorActivity.java | 39 +++++++++++++++++- 2 files changed, 63 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/d421e3c2/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java index 99a29e5..2900623 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java @@ -16,27 +16,14 @@ */ package org.apache.nifi.processors.standard; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -49,6 +36,20 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + @SideEffectFree @TriggerSerially @TriggerWhenEmpty @@ -149,6 +150,15 @@ public class MonitorActivity extends AbstractProcessor { return properties; } + @OnScheduled + public void resetLastSuccessfulTransfer() { + setLastSuccessfulTransfer(System.currentTimeMillis()); + } + + protected final void setLastSuccessfulTransfer(final long timestamp) { + latestSuccessTransfer.set(timestamp); + } + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) { final long thresholdMillis = context.getProperty(THRESHOLD).asTimePeriod(TimeUnit.MILLISECONDS); http://git-wip-us.apache.org/repos/asf/nifi/blob/d421e3c2/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java index 2e87441..f02e6da 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java @@ -31,7 +31,7 @@ public class TestMonitorActivity { @Test public void testFirstMessage() throws InterruptedException, IOException { - final TestRunner runner = TestRunners.newTestRunner(new MonitorActivity()); + final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(1000L)); runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false"); runner.setProperty(MonitorActivity.THRESHOLD, "100 millis"); @@ -101,7 +101,7 @@ public class TestMonitorActivity { @Test public void testFirstMessageWithInherit() throws InterruptedException, IOException { - final TestRunner runner = TestRunners.newTestRunner(new MonitorActivity()); + final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(1000L)); runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false"); runner.setProperty(MonitorActivity.THRESHOLD, "100 millis"); runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true"); @@ -188,4 +188,39 @@ public class TestMonitorActivity { String.format("lineage start dates match when they shouldn't original=%1$s restored=%2$s", originalFlowFile.getLineageStartDate(), restoredFlowFile.getLineageStartDate()), restoredFlowFile.getLineageStartDate() != originalFlowFile.getLineageStartDate()); } + + @Test + public void testFirstRunNoMessages() throws InterruptedException, IOException { + // don't use the TestableProcessor, we want the real timestamp from @OnScheduled + final TestRunner runner = TestRunners.newTestRunner(new MonitorActivity()); + runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false"); + runner.setProperty(MonitorActivity.THRESHOLD, "100 millis"); + + Thread.sleep(1000L); + + // shouldn't generate inactivity b/c run() will reset the lastSuccessfulTransfer + runner.run(); + runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0); + runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0); + runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0); + runner.clearTransferState(); + } + + /** + * Since each call to run() will call @OnScheduled methods which will set the lastSuccessfulTransfer to the + * current time, we need a way to create an artificial time difference between calls to run. + */ + private class TestableProcessor extends MonitorActivity { + + private final long timestampDifference; + + public TestableProcessor(final long timestampDifference) { + this.timestampDifference = timestampDifference; + } + + @Override + public void resetLastSuccessfulTransfer() { + setLastSuccessfulTransfer(System.currentTimeMillis() - timestampDifference); + } + } }
