This is an automated email from the ASF dual-hosted git repository.
jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 5cc2790891 NIFI-10302 Streamlined TestMonitorActivity to improve
reliability
5cc2790891 is described below
commit 5cc2790891d23991f6cf23827757f77d9ab7e28f
Author: exceptionfactory <[email protected]>
AuthorDate: Fri Jul 29 16:12:44 2022 -0500
NIFI-10302 Streamlined TestMonitorActivity to improve reliability
- Removed TestRunner.run() loops
- Reduced usage of Thread.sleep()
- Upgraded TestMonitorActivity to JUnit 5
Signed-off-by: Joe Gresock <[email protected]>
This closes #6261.
---
.../processors/standard/TestMonitorActivity.java | 225 +++++++--------------
1 file changed, 71 insertions(+), 154 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
index 667ffea0ce..c0382b8663 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java
@@ -30,20 +30,21 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestMonitorActivity {
@Test
- public void testFirstMessage() throws InterruptedException {
- final TestRunner runner = TestRunners.newTestRunner(new
TestableProcessor(1000L));
+ public void testFirstMessage() {
+ final TestableProcessor processor = new TestableProcessor(1000);
+ final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
runner.setProperty(MonitorActivity.THRESHOLD, "100 millis");
@@ -52,21 +53,12 @@ public class TestMonitorActivity {
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS, 1);
runner.clearTransferState();
- Thread.sleep(1000L);
+ processor.resetLastSuccessfulTransfer();
runNext(runner);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE, 1);
runner.clearTransferState();
- // ensure we don't keep creating the message
- for (int i = 0; i < 10; i++) {
- runNext(runner);
- runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
- runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
- runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED,
0);
- Thread.sleep(100L);
- }
-
Map<String, String> attributes = new HashMap<>();
attributes.put("key", "value");
attributes.put("key1", "value1");
@@ -79,20 +71,17 @@ public class TestMonitorActivity {
MockFlowFile restoredFlowFile =
runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED).get(0);
String flowFileContent = new String(restoredFlowFile.toByteArray());
- Assert.assertTrue(Pattern.matches("Activity restored at time: (.*)
after being inactive for 0 minutes", flowFileContent));
+ assertTrue(Pattern.matches("Activity restored at time: (.*) after
being inactive for 0 minutes", flowFileContent));
restoredFlowFile.assertAttributeNotExists("key");
restoredFlowFile.assertAttributeNotExists("key1");
runner.clearTransferState();
runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "true");
- Thread.sleep(200L);
- for (int i = 0; i < 10; i++) {
- runNext(runner);
- Thread.sleep(200L);
- }
+ processor.resetLastSuccessfulTransfer();
+ runNext(runner);
- runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 10);
+ runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
runner.clearTransferState();
@@ -106,14 +95,13 @@ public class TestMonitorActivity {
restoredFlowFile =
runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED).get(0);
flowFileContent = new String(restoredFlowFile.toByteArray());
- Assert.assertTrue(Pattern.matches("Activity restored at time: (.*)
after being inactive for 0 minutes", flowFileContent));
+ assertTrue(Pattern.matches("Activity restored at time: (.*) after
being inactive for 0 minutes", flowFileContent));
restoredFlowFile.assertAttributeNotExists("key");
restoredFlowFile.assertAttributeNotExists("key1");
}
@Test
public void testReconcileAfterFirstStartWhenLastSuccessIsAlreadySet()
throws Exception {
- // given
final String lastSuccessInCluster =
String.valueOf(System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(5));
final TestRunner runner = TestRunners.newTestRunner(new
TestableProcessor(0));
runner.setIsConfiguredForClustering(true);
@@ -124,11 +112,9 @@ public class TestMonitorActivity {
runner.setProperty(MonitorActivity.THRESHOLD, "5 secs");
runner.getStateManager().setState(Collections.singletonMap(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER,
lastSuccessInCluster), Scope.CLUSTER);
- // when
runner.enqueue("lorem ipsum");
runner.run(1, false);
- // then
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
final StateMap updatedState =
runner.getStateManager().getState(Scope.CLUSTER);
assertNotEquals(lastSuccessInCluster,
updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
@@ -136,7 +122,6 @@ public class TestMonitorActivity {
@Test
public void testReconcileWhenSharedStateIsNotYetSet() throws Exception {
- // given
final TestableProcessor processor = new TestableProcessor(0);
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setIsConfiguredForClustering(true);
@@ -146,110 +131,70 @@ public class TestMonitorActivity {
runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
runner.setProperty(MonitorActivity.THRESHOLD, "5 secs");
- // when
runner.setConnected(false);
runner.enqueue("lorem ipsum");
runner.run(1, false, false);
- // then
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
- // when
runner.setConnected(true);
runner.run(1, false, false);
- // then
final long tLocal = processor.getLatestSuccessTransfer();
final long tCluster = getLastSuccessFromCluster(runner);
assertEquals(tLocal, tCluster);
}
@Test
- public void testReconcileAfterReconnectWhenPrimary() throws Exception {
- // given
- final TestableProcessor processor = new TestableProcessor(0);
- final TestRunner runner = givenRunnerIsSetUpForReconcile(processor,
true);
+ public void testReconcileAfterReconnectWhenPrimary() throws
InterruptedException {
+ final TestRunner runner = getRunnerScopeCluster(new MonitorActivity(),
true);
- // when - First trigger will write last success transfer into cluster.
- Thread.sleep(8);
+ // First trigger will write last success transfer into cluster.
runner.enqueue("lorem ipsum");
runNext(runner);
- final long t1Local = processor.getLatestSuccessTransfer();
- final long t1Cluster = getLastSuccessFromCluster(runner);
- // then
- Assert.assertEquals(t1Local, t1Cluster);
- thenTransfersAre(runner, 1, 0, 0);
+ assertTransferCountSuccessInactiveRestored(runner, 1, 0);
- // when - At second trigger it's not connected, new last success
transfer stored only locally.
- Thread.sleep(20);
+ // At second trigger it's not connected, new last success transfer
stored only locally.
runner.setConnected(false);
runner.enqueue("lorem ipsum");
runNext(runner);
- final long t2Local = processor.getLatestSuccessTransfer();
- final long t2Cluster = getLastSuccessFromCluster(runner);
- // then
- Assert.assertNotEquals(t1Local, t2Local);
- Assert.assertEquals(t1Local, t2Cluster);
- thenTransfersAre(runner, 2, 0, 0);
+ assertTransferCountSuccessInactiveRestored(runner, 2, 0);
- // when - The third trigger is without flow file, but reconcile is
triggered and value is written ot cluster.
- Thread.sleep(20);
+ // The third trigger is without flow file, but reconcile is triggered
and value is written ot cluster.
runner.setConnected(true);
+ TimeUnit.MILLISECONDS.sleep(500);
runNext(runner);
- final long t3Local = processor.getLatestSuccessTransfer();
- final long t3Cluster = getLastSuccessFromCluster(runner);
- // then
- Assert.assertEquals(t3Local, t2Local);
- Assert.assertEquals(t3Cluster, t2Local);
// Inactive message is being sent after the connection is back.
- thenTransfersAre(runner,2, 1, 0);
+ assertTransferCountSuccessInactiveRestored(runner,2, 1);
}
@Test
- public void testReconcileAfterReconnectWhenNotPrimary() throws Exception {
- // given
- final TestableProcessor processor = new TestableProcessor(0);
- final TestRunner runner = givenRunnerIsSetUpForReconcile(processor,
false);
+ public void testReconcileAfterReconnectWhenNotPrimary() {
+ final TestableProcessor processor = new TestableProcessor(1000);
+ final TestRunner runner = getRunnerScopeCluster(processor, false);
- // when - First trigger will write last success transfer into cluster.
- Thread.sleep(8);
+ // First trigger will write last success transfer into cluster.
runner.enqueue("lorem ipsum");
runNext(runner);
- final long t1Local = processor.getLatestSuccessTransfer();
- final long t1Cluster = getLastSuccessFromCluster(runner);
- // then
- Assert.assertEquals(t1Local, t1Cluster);
- thenTransfersAre(runner, 1, 0, 0);
+ assertTransferCountSuccessInactiveRestored(runner, 1, 0);
- // when - At second trigger it's not connected, new last success
transfer stored only locally.
- Thread.sleep(20);
+ // At second trigger it's not connected, new last success transfer
stored only locally.
runner.setConnected(false);
runner.enqueue("lorem ipsum");
runNext(runner);
- final long t2Local = processor.getLatestSuccessTransfer();
- final long t2Cluster = getLastSuccessFromCluster(runner);
- // then
- Assert.assertNotEquals(t1Local, t2Local);
- Assert.assertEquals(t1Local, t2Cluster);
- thenTransfersAre(runner, 2, 0, 0);
+ assertTransferCountSuccessInactiveRestored(runner, 2, 0);
- // when - The third trigger is without flow file, but reconcile is
triggered and value is written ot cluster.
- Thread.sleep(20);
+ // The third trigger is without flow file, but reconcile is triggered
and value is written ot cluster.
runner.setConnected(true);
runNext(runner);
- final long t3Local = processor.getLatestSuccessTransfer();
- final long t3Cluster = getLastSuccessFromCluster(runner);
- // then
- Assert.assertEquals(t3Local, t2Local);
- Assert.assertEquals(t3Cluster, t2Local);
// No inactive message because of the node is not primary
- thenTransfersAre(runner, 2, 0, 0);
+ assertTransferCountSuccessInactiveRestored(runner, 2, 0);
}
private void runNext(TestRunner runner) {
@@ -257,7 +202,7 @@ public class TestMonitorActivity {
runner.run(1, false, false);
}
- private TestRunner givenRunnerIsSetUpForReconcile(final TestableProcessor
processor, final boolean isPrimary) {
+ private TestRunner getRunnerScopeCluster(final MonitorActivity processor,
final boolean isPrimary) {
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setIsConfiguredForClustering(true);
runner.setPrimaryNode(isPrimary);
@@ -272,18 +217,14 @@ public class TestMonitorActivity {
return
Long.valueOf(runner.getStateManager().getState(Scope.CLUSTER).get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
}
- private void thenTransfersAre(TestRunner runner, final int success, final
int inactive, final int restored) {
- if (success > 0 && inactive == 0 & restored == 0) {
- runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
- }
+ private void assertTransferCountSuccessInactiveRestored(TestRunner runner,
final int success, final int inactive) {
runner.assertTransferCount(MonitorActivity.REL_SUCCESS, success);
runner.assertTransferCount(MonitorActivity.REL_INACTIVE, inactive);
- runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED,
restored);
+ runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
}
@Test
public void testNoReportingWhenDisconnected() {
- // given
final TestRunner runner = TestRunners.newTestRunner(new
TestableProcessor(TimeUnit.MINUTES.toMillis(5)));
runner.setIsConfiguredForClustering(true);
@@ -293,26 +234,23 @@ public class TestMonitorActivity {
runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
runner.setProperty(MonitorActivity.THRESHOLD, "3 minutes");
- // when
runner.setConnected(false);
runner.run(1, false);
- // then
runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
- // when
runner.setConnected(true);
runner.run(1, false);
- // then
runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
}
@Test
- public void testFirstMessageWithInherit() throws InterruptedException,
IOException {
- final TestRunner runner = TestRunners.newTestRunner(new
TestableProcessor(1000L));
+ public void testFirstMessageWithInherit() {
+ final TestableProcessor processor = new TestableProcessor(1000);
+ final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
runner.setProperty(MonitorActivity.THRESHOLD, "100 millis");
runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
@@ -323,21 +261,12 @@ public class TestMonitorActivity {
MockFlowFile originalFlowFile =
runner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS).get(0);
runner.clearTransferState();
- Thread.sleep(1000L);
+ processor.resetLastSuccessfulTransfer();
runNext(runner);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE, 1);
runner.clearTransferState();
- // ensure we don't keep creating the message
- for (int i = 0; i < 10; i++) {
- runNext(runner);
- runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
- runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
- runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED,
0);
- Thread.sleep(100L);
- }
-
Map<String, String> attributes = new HashMap<>();
attributes.put("key", "value");
attributes.put("key1", "value1");
@@ -350,30 +279,23 @@ public class TestMonitorActivity {
MockFlowFile restoredFlowFile =
runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED).get(0);
String flowFileContent = new String(restoredFlowFile.toByteArray());
- Assert.assertTrue(Pattern.matches("Activity restored at time: (.*)
after being inactive for 0 minutes", flowFileContent));
+ assertTrue(Pattern.matches("Activity restored at time: (.*) after
being inactive for 0 minutes", flowFileContent));
restoredFlowFile.assertAttributeEquals("key", "value");
restoredFlowFile.assertAttributeEquals("key1", "value1");
// verify the UUIDs are not the same
restoredFlowFile.assertAttributeNotEquals(CoreAttributes.UUID.key(),
originalFlowFile.getAttribute(CoreAttributes.UUID.key()));
restoredFlowFile.assertAttributeNotEquals(CoreAttributes.FILENAME.key(),
originalFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
- Assert.assertTrue(
- String.format("file sizes match when they shouldn't
original=%1$s restored=%2$s",
- originalFlowFile.getSize(),
restoredFlowFile.getSize()), restoredFlowFile.getSize() !=
originalFlowFile.getSize());
- Assert.assertTrue(
- String.format("lineage start dates match when they shouldn't
original=%1$s restored=%2$s",
- originalFlowFile.getLineageStartDate(),
restoredFlowFile.getLineageStartDate()), restoredFlowFile.getLineageStartDate()
!= originalFlowFile.getLineageStartDate());
+ assertNotEquals(restoredFlowFile.getSize(),
originalFlowFile.getSize());
+ assertNotEquals(restoredFlowFile.getLineageStartDate(),
originalFlowFile.getLineageStartDate());
runner.clearTransferState();
runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "true");
- Thread.sleep(200L);
- for (int i = 0; i < 10; i++) {
- runNext(runner);
- Thread.sleep(200L);
- }
+ processor.resetLastSuccessfulTransfer();
+ runNext(runner);
- runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 10);
+ runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
runner.clearTransferState();
@@ -387,26 +309,23 @@ public class TestMonitorActivity {
restoredFlowFile =
runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED).get(0);
flowFileContent = new String(restoredFlowFile.toByteArray());
- Assert.assertTrue(Pattern.matches("Activity restored at time: (.*)
after being inactive for 0 minutes", flowFileContent));
+ assertTrue(Pattern.matches("Activity restored at time: (.*) after
being inactive for 0 minutes", flowFileContent));
restoredFlowFile.assertAttributeEquals("key", "value");
restoredFlowFile.assertAttributeEquals("key1", "value1");
restoredFlowFile.assertAttributeNotEquals(CoreAttributes.UUID.key(),
originalFlowFile.getAttribute(CoreAttributes.UUID.key()));
restoredFlowFile.assertAttributeNotEquals(CoreAttributes.FILENAME.key(),
originalFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
- Assert.assertTrue(
- String.format("file sizes match when they shouldn't
original=%1$s restored=%2$s",
- originalFlowFile.getSize(),
restoredFlowFile.getSize()), restoredFlowFile.getSize() !=
originalFlowFile.getSize());
- Assert.assertTrue(
- String.format("lineage start dates match when they shouldn't
original=%1$s restored=%2$s",
- originalFlowFile.getLineageStartDate(),
restoredFlowFile.getLineageStartDate()), restoredFlowFile.getLineageStartDate()
!= originalFlowFile.getLineageStartDate());
+ assertNotEquals(restoredFlowFile.getSize(),
originalFlowFile.getSize());
+ assertNotEquals(restoredFlowFile.getLineageStartDate(),
originalFlowFile.getLineageStartDate());
}
- @Test(timeout=5000)
- public void testFirstRunNoMessages() throws InterruptedException,
IOException {
+ @Timeout(5)
+ @Test
+ public void testFirstRunNoMessages() throws InterruptedException {
// don't use the TestableProcessor, we want the real timestamp from
@OnScheduled
final TestRunner runner = TestRunners.newTestRunner(new
MonitorActivity());
runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
int threshold = 100;
- boolean rerun = false;
+ boolean rerun;
do {
rerun = false;
runner.setProperty(MonitorActivity.THRESHOLD, threshold + "
millis");
@@ -436,7 +355,7 @@ public class TestMonitorActivity {
* Since each call to run() will call @OnScheduled methods which will set
the lastSuccessfulTransfer to the
* current time, we need a way to create an artificial time difference
between calls to run.
*/
- private class TestableProcessor extends MonitorActivity {
+ private static class TestableProcessor extends MonitorActivity {
private final long timestampDifference;
@@ -451,7 +370,7 @@ public class TestMonitorActivity {
}
@Test
- public void testClusterMonitorInvalidReportingNode() throws Exception {
+ public void testClusterMonitorInvalidReportingNode() {
final TestRunner runner = TestRunners.newTestRunner(new
TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
runner.setIsConfiguredForClustering(true);
@@ -478,7 +397,7 @@ public class TestMonitorActivity {
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
final StateMap updatedState =
runner.getStateManager().getState(Scope.CLUSTER);
- assertNotNull("Latest timestamp should be persisted",
updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+
assertNotNull(updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
// Should be null because COPY_ATTRIBUTES is null.
assertNull(updatedState.get("key1"));
assertNull(updatedState.get("key2"));
@@ -500,8 +419,7 @@ public class TestMonitorActivity {
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
final StateMap updatedState =
runner.getStateManager().getState(Scope.CLUSTER);
- assertNull("Latest timestamp should NOT be persisted, because it's
running as 'node' scope",
-
updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+
assertNull(updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
}
@Test
@@ -530,8 +448,7 @@ public class TestMonitorActivity {
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
final StateMap postProcessedState =
runner.getStateManager().getState(Scope.CLUSTER);
- assertTrue("Existing timestamp should be updated",
- existingTimestamp < Long.parseLong(postProcessedState.get(
+ assertTrue( existingTimestamp <
Long.parseLong(postProcessedState.get(
MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER)));
// State should be updated. Null in this case.
assertNull(postProcessedState.get("key1"));
@@ -564,7 +481,7 @@ public class TestMonitorActivity {
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
final StateMap postProcessedState =
runner.getStateManager().getState(Scope.CLUSTER);
- assertEquals("Existing timestamp should NOT be updated",
+ assertEquals(
String.valueOf(existingTimestamp),
postProcessedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
// State should stay the same.
@@ -592,13 +509,13 @@ public class TestMonitorActivity {
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
final StateMap updatedState =
runner.getStateManager().getState(Scope.CLUSTER);
- assertNotNull("Latest timestamp should be persisted",
updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+
assertNotNull(updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
assertEquals("value1", updatedState.get("key1"));
assertEquals("value2", updatedState.get("key2"));
}
@Test
- public void testClusterMonitorInactivity() throws Exception {
+ public void testClusterMonitorInactivity() {
final TestRunner runner = TestRunners.newTestRunner(new
TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
runner.setIsConfiguredForClustering(true);
runner.setPrimaryNode(false);
@@ -621,7 +538,7 @@ public class TestMonitorActivity {
}
@Test
- public void testClusterMonitorInactivityFallbackToNodeScope() throws
Exception {
+ public void testClusterMonitorInactivityFallbackToNodeScope() {
final TestRunner runner = TestRunners.newTestRunner(new
TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
runner.setIsConfiguredForClustering(false);
runner.setPrimaryNode(false);
@@ -644,7 +561,7 @@ public class TestMonitorActivity {
}
@Test
- public void testClusterMonitorInactivityOnPrimaryNode() throws Exception {
+ public void testClusterMonitorInactivityOnPrimaryNode() {
final TestableProcessor processor = new
TestableProcessor(TimeUnit.MINUTES.toMillis(120));
final TestRunner runner = TestRunners.newTestRunner(processor);
@@ -670,7 +587,7 @@ public class TestMonitorActivity {
}
@Test
- public void testClusterMonitorInactivityOnNode() throws Exception {
+ public void testClusterMonitorInactivityOnNode() {
final TestRunner runner = TestRunners.newTestRunner(new
TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
runner.setIsConfiguredForClustering(true);
runner.setPrimaryNode(false);
@@ -719,7 +636,7 @@ public class TestMonitorActivity {
// Latest activity should be persisted
final StateMap updatedState =
runner.getStateManager().getState(Scope.CLUSTER);
- assertNotNull("Latest timestamp should be persisted",
updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+
assertNotNull(updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
assertEquals("value1", updatedState.get("key1"));
assertEquals("value2", updatedState.get("key2"));
runner.clearTransferState();
@@ -753,7 +670,7 @@ public class TestMonitorActivity {
// Latest activity should be persisted
final StateMap updatedState =
runner.getStateManager().getState(Scope.CLUSTER);
- assertNotNull("Latest timestamp should be persisted",
updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+
assertNotNull(updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
assertEquals("value1", updatedState.get("key1"));
assertEquals("value2", updatedState.get("key2"));
runner.clearTransferState();
@@ -792,7 +709,7 @@ public class TestMonitorActivity {
// Latest activity should be persisted
final StateMap updatedState =
runner.getStateManager().getState(Scope.CLUSTER);
- assertNotNull("Latest timestamp should be persisted",
updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+
assertNotNull(updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
assertEquals("value1", updatedState.get("key1"));
assertEquals("value2", updatedState.get("key2"));
runner.clearTransferState();
@@ -831,7 +748,7 @@ public class TestMonitorActivity {
// Latest activity should NOT be persisted
final StateMap updatedState =
runner.getStateManager().getState(Scope.CLUSTER);
- assertNull("Latest timestamp should NOT be persisted",
updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
+
assertNull(updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
runner.clearTransferState();
}
@@ -861,7 +778,7 @@ public class TestMonitorActivity {
runNext(runner);
final List<MockFlowFile> successFiles =
runner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS);
final List<MockFlowFile> activityRestoredFiles =
runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED);
- assertEquals("Should be zero since it doesn't have incoming file.", 0,
successFiles.size());
+ assertEquals(0, successFiles.size());
assertEquals(1, activityRestoredFiles.size());
assertEquals("value1",
activityRestoredFiles.get(0).getAttribute("key1"));
assertEquals("value2",
activityRestoredFiles.get(0).getAttribute("key2"));
@@ -898,7 +815,7 @@ public class TestMonitorActivity {
runNext(runner);
final List<MockFlowFile> successFiles =
runner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS);
final List<MockFlowFile> activityRestoredFiles =
runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED);
- assertEquals("Should be zero since it doesn't have incoming file.", 0,
successFiles.size());
+ assertEquals(0, successFiles.size());
assertEquals(1, activityRestoredFiles.size());
assertEquals("value1",
activityRestoredFiles.get(0).getAttribute("key1"));
assertEquals("value2",
activityRestoredFiles.get(0).getAttribute("key2"));