Repository: nifi
Updated Branches:
  refs/heads/master b17be66a1 -> d421e3c24


NIFI-846 MonitorActivity not setting start of evaluation period correctly


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d421e3c2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d421e3c2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d421e3c2

Branch: refs/heads/master
Commit: d421e3c2424b224522e9e5a83f014ccda2496739
Parents: b17be66
Author: Bryan Bende <[email protected]>
Authored: Fri Aug 21 11:06:20 2015 -0400
Committer: Bryan Bende <[email protected]>
Committed: Fri Aug 21 11:06:20 2015 -0400

----------------------------------------------------------------------
 .../processors/standard/MonitorActivity.java    | 42 ++++++++++++--------
 .../standard/TestMonitorActivity.java           | 39 +++++++++++++++++-
 2 files changed, 63 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d421e3c2/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
index 99a29e5..2900623 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
@@ -16,27 +16,14 @@
  */
 package org.apache.nifi.processors.standard;
 
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -49,6 +36,20 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
 @SideEffectFree
 @TriggerSerially
 @TriggerWhenEmpty
@@ -149,6 +150,15 @@ public class MonitorActivity extends AbstractProcessor {
         return properties;
     }
 
+    @OnScheduled
+    public void resetLastSuccessfulTransfer() {
+        setLastSuccessfulTransfer(System.currentTimeMillis());
+    }
+
+    protected final void setLastSuccessfulTransfer(final long timestamp) {
+        latestSuccessTransfer.set(timestamp);
+    }
+
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
         final long thresholdMillis = 
context.getProperty(THRESHOLD).asTimePeriod(TimeUnit.MILLISECONDS);

http://git-wip-us.apache.org/repos/asf/nifi/blob/d421e3c2/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
index 2e87441..f02e6da 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
@@ -31,7 +31,7 @@ public class TestMonitorActivity {
 
     @Test
     public void testFirstMessage() throws InterruptedException, IOException {
-        final TestRunner runner = TestRunners.newTestRunner(new 
MonitorActivity());
+        final TestRunner runner = TestRunners.newTestRunner(new 
TestableProcessor(1000L));
         runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
         runner.setProperty(MonitorActivity.THRESHOLD, "100 millis");
 
@@ -101,7 +101,7 @@ public class TestMonitorActivity {
 
     @Test
     public void testFirstMessageWithInherit() throws InterruptedException, 
IOException {
-        final TestRunner runner = TestRunners.newTestRunner(new 
MonitorActivity());
+        final TestRunner runner = TestRunners.newTestRunner(new 
TestableProcessor(1000L));
         runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
         runner.setProperty(MonitorActivity.THRESHOLD, "100 millis");
         runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
@@ -188,4 +188,39 @@ public class TestMonitorActivity {
                 String.format("lineage start dates match when they shouldn't 
original=%1$s restored=%2$s",
                         originalFlowFile.getLineageStartDate(), 
restoredFlowFile.getLineageStartDate()), restoredFlowFile.getLineageStartDate() 
!= originalFlowFile.getLineageStartDate());
     }
+
+    @Test
+    public void testFirstRunNoMessages() throws InterruptedException, 
IOException {
+        // don't use the TestableProcessor, we want the real timestamp from 
@OnScheduled
+        final TestRunner runner = TestRunners.newTestRunner(new 
MonitorActivity());
+        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
+        runner.setProperty(MonitorActivity.THRESHOLD, "100 millis");
+
+        Thread.sleep(1000L);
+
+        // shouldn't generate inactivity b/c run() will reset the 
lastSuccessfulTransfer
+        runner.run();
+        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
+        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
+        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
+        runner.clearTransferState();
+    }
+
+    /**
+     * Since each call to run() will call @OnScheduled methods which will set 
the lastSuccessfulTransfer to the
+     * current time, we need a way to create an artificial time difference 
between calls to run.
+     */
+    private class TestableProcessor extends MonitorActivity {
+
+        private final long timestampDifference;
+
+        public TestableProcessor(final long timestampDifference) {
+            this.timestampDifference = timestampDifference;
+        }
+
+        @Override
+        public void resetLastSuccessfulTransfer() {
+            setLastSuccessfulTransfer(System.currentTimeMillis() - 
timestampDifference);
+        }
+    }
 }

Reply via email to