This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 38b51b0dde NIFI-10037: When system test fails to clean up flow,
destroy the entire environment so that the next test starts in a healthy state.
Name troubleshooting directories with the name of the test class to avoid
ambiguity. Also added a log statement so that we know which test is running
when looking at the log output from the tests themselves. Finally, found an
issue in AbstractComponentNode in which we iterate over the elements in a Map
and call setProperty, which can upda [...]
38b51b0dde is described below
commit 38b51b0dde24929563c4fc6c9a8c7a10e39ef713
Author: Mark Payne <[email protected]>
AuthorDate: Thu May 19 11:58:52 2022 -0400
NIFI-10037: When system test fails to clean up flow, destroy the entire
environment so that the next test starts in a healthy state. Name
troubleshooting directories with the name of the test class to avoid ambiguity.
Also added a log statement so that we know which test is running when looking
at the log output from the tests themselves. Finally, found an issue in
AbstractComponentNode in which we iterate over the elements in a Map and call
setProperty, which can update the underlyin [...]
This closes #6059
Signed-off-by: David Handermann <[email protected]>
---
.../nifi/controller/AbstractComponentNode.java | 6 ++-
.../org/apache/nifi/tests/system/NiFiSystemIT.java | 8 ++++
.../tests/system/TroubleshootingTestWatcher.java | 11 +++--
.../tests/system/loadbalance/LoadBalanceIT.java | 54 ++++++++++++++++------
.../resources/conf/clustered/node1/nifi.properties | 2 +-
.../resources/conf/clustered/node2/bootstrap.conf | 2 +-
.../resources/conf/clustered/node2/nifi.properties | 2 +-
.../test/resources/conf/default/nifi.properties | 2 +-
8 files changed, 64 insertions(+), 23 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
index 59f9bd50c4..61d17eb4b8 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
@@ -66,6 +66,7 @@ import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
@@ -591,7 +592,10 @@ public abstract class AbstractComponentNode implements
ComponentNode {
// use setProperty instead of setProperties so we can bypass the class
loading logic.
// Consider value changed if it is different than the
PropertyDescriptor's default value because we need to call the
#onPropertiesModified
// method on the component if the current value is not the default
value, since the component itself is being reloaded.
- for (final Map.Entry<PropertyDescriptor, PropertyConfiguration> entry
: this.properties.entrySet()) {
+ // Also, create a copy of this.properties instead of iterating
directly over this.properties since the call to setProperty can change the
+ // underlying map, and the behavior of modifying the map while
iterating over its elements is undefined.
+ final Map<PropertyDescriptor, PropertyConfiguration>
copyOfPropertiesMap = new HashMap<>(this.properties);
+ for (final Map.Entry<PropertyDescriptor, PropertyConfiguration> entry
: copyOfPropertiesMap.entrySet()) {
final PropertyDescriptor propertyDescriptor = entry.getKey();
final PropertyConfiguration configuration = entry.getValue();
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
index 341e384812..298e3ec473 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
@@ -73,6 +73,8 @@ public abstract class NiFiSystemIT implements
NiFiInstanceProvider {
@BeforeEach
public void setup(final TestInfo testInfo) throws IOException {
this.testInfo = testInfo;
+ final String testClassName =
testInfo.getTestClass().map(Class::getSimpleName).orElse("<Unknown Test
Class>");
+ logger.info("Beginning Test {}:{}", testClassName,
testInfo.getDisplayName());
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
setupClient();
@@ -116,6 +118,12 @@ public abstract class NiFiSystemIT implements
NiFiInstanceProvider {
if (isDestroyEnvironmentAfterEachTest()) {
cleanup();
+ } else if (destroyFlowFailure != null) {
+ // If unable to destroy the flow, we need to shutdown the
instance and delete the flow and completely recreate the environment.
+ // Otherwise, we will be left in an unknown state for the next
test, and that can cause cascading failures that are very difficult
+ // to understand and troubleshoot.
+ logger.info("Because there was a failure when destroying the
flow, will completely tear down the environments and start with a clean
environment for the next test.");
+ cleanup();
}
if (destroyFlowFailure != null) {
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/TroubleshootingTestWatcher.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/TroubleshootingTestWatcher.java
index 8d71362359..b2c1ab078f 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/TroubleshootingTestWatcher.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/TroubleshootingTestWatcher.java
@@ -40,7 +40,8 @@ public class TroubleshootingTestWatcher implements
TestWatcher {
final NiFiInstanceProvider provider = (NiFiInstanceProvider)
testInstance;
final String displayName = context.getDisplayName();
try {
- final File dir = quarantineTroubleshootingInfo(provider,
displayName, cause);
+ final String testClassName =
context.getTestClass().map(Class::getSimpleName).orElse("TestClassUnknown");
+ final File dir = quarantineTroubleshootingInfo(provider,
testClassName, displayName, cause);
logger.info("Test Failed [{}]: Troubleshooting information
stored [{}]", displayName, dir.getAbsolutePath());
} catch (final Exception e) {
logger.error("Test Failed [{}]: Troubleshooting
information not stored", displayName, e);
@@ -49,17 +50,21 @@ public class TroubleshootingTestWatcher implements
TestWatcher {
}
}
- private File quarantineTroubleshootingInfo(final NiFiInstanceProvider
provider, final String methodName, final Throwable failureCause) throws
IOException {
+ private File quarantineTroubleshootingInfo(final NiFiInstanceProvider
provider, final String testClassName, final String methodName, final Throwable
failureCause) throws IOException {
NiFiInstance instance = provider.getNiFiInstance();
// The teardown method may or may not have already run at this point.
If it has, the instance will be null.
// In that case, just create a new instance and use it - it will map
to the same directories.
if (instance == null) {
+ logger.warn("While capturing troubleshooting info for {}, the NiFi
Instance is not available. Will create a new one for Diagnostics purposes, but
some of the diagnostics may be less " +
+ "accurate, since it's not the same instance that ran the
test", methodName);
+
instance = provider.getInstanceFactory().createInstance();
}
final File troubleshooting = new File("target/troubleshooting");
- final File quarantineDir = new File(troubleshooting, methodName);
+ final String quarantineDirName = testClassName + "-" +
methodName.replace("()", "");
+ final File quarantineDir = new File(troubleshooting,
quarantineDirName);
quarantineDir.mkdirs();
instance.quarantineTroubleshootingInfo(quarantineDir, failureCause);
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java
index 2e1b1c0698..5e27f24737 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java
@@ -42,12 +42,13 @@ import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.LongSummaryStatistics;
import java.util.Map;
import java.util.Set;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class LoadBalanceIT extends NiFiSystemIT {
private final Logger logger = LoggerFactory.getLogger(getClass());
@@ -277,13 +278,13 @@ public class LoadBalanceIT extends NiFiSystemIT {
private int getQueueSize(final String connectionId) {
final ConnectionStatusEntity statusEntity =
getConnectionStatus(connectionId);
final ConnectionStatusDTO connectionStatusDto =
statusEntity.getConnectionStatus();
- return
connectionStatusDto.getAggregateSnapshot().getFlowFilesQueued().intValue();
+ return connectionStatusDto.getAggregateSnapshot().getFlowFilesQueued();
}
private long getQueueBytes(final String connectionId) {
final ConnectionStatusEntity statusEntity =
getConnectionStatus(connectionId);
final ConnectionStatusDTO connectionStatusDto =
statusEntity.getConnectionStatus();
- return
connectionStatusDto.getAggregateSnapshot().getBytesQueued().longValue();
+ return connectionStatusDto.getAggregateSnapshot().getBytesQueued();
}
private boolean isConnectionDoneLoadBalancing(final String connectionId) {
@@ -372,22 +373,45 @@ public class LoadBalanceIT extends NiFiSystemIT {
instance2.start(true);
waitForAllNodesConnected();
- // Generate the data again
generate =
getNifiClient().getProcessorClient().getProcessor(generate.getId());
- getNifiClient().getProcessorClient().startProcessor(generate);
- // Wait until all 20 FlowFiles are queued up
- waitFor(() -> {
- final ConnectionStatusEntity secondRoundStatusEntity =
getConnectionStatus(connection.getId());
- return
secondRoundStatusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued()
== 20;
- });
+ // Generate data and wait for it to be spread across the cluster. We
do this in an infinite while() loop because
+ // there can be a failure, in which case we'll retry. If that happens,
we just want to keep retrying until the test
+ // times out.
+ while (true) {
+ // Generate the data.
+ getNifiClient().getProcessorClient().startProcessor(generate);
- // Wait until load balancing is complete
- waitFor(() -> isConnectionDoneLoadBalancing(connection.getId()));
+ // Wait until all 20 FlowFiles are queued up
+ waitFor(() -> {
+ final ConnectionStatusEntity secondRoundStatusEntity =
getConnectionStatus(connection.getId());
+ return
secondRoundStatusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued()
== 20;
+ });
- // Ensure that the FlowFiles are evenly distributed between the nodes.
- final ConnectionStatusEntity afterSecondDataGenerationStatusEntity =
getConnectionStatus(connection.getId());
- assertTrue(isEvenlyDistributed(afterSecondDataGenerationStatusEntity));
+ // Wait until load balancing is complete
+ waitFor(() -> isConnectionDoneLoadBalancing(connection.getId()));
+
+ // Log the distribution of data between nodes for easier
troubleshooting in case there's a failure.
+ final ConnectionStatusEntity afterSecondDataGenerationStatusEntity
= getConnectionStatus(connection.getId());
+ final List<NodeConnectionStatusSnapshotDTO> nodeSnapshots =
afterSecondDataGenerationStatusEntity.getConnectionStatus().getNodeSnapshots();
+ logger.info("FlowFiles Queued Per Node:");
+ nodeSnapshots.forEach(snapshot ->
+ logger.info("{}:{} - {}", snapshot.getAddress(),
snapshot.getApiPort(), snapshot.getStatusSnapshot().getFlowFilesQueued())
+ );
+
+ // Check if the FlowFiles are evenly distributed between the
nodes. If so, we're done.
+ final boolean evenlyDistributed =
isEvenlyDistributed(afterSecondDataGenerationStatusEntity);
+ if (evenlyDistributed) {
+ break;
+ }
+
+ // If there's an IOException thrown while communicating between
the nodes, the data will be rebalanced and will go to
+ // the local partition. There's nothing we can do about that in
this test. However, we can verify that NiFi recovers
+ // from this and continues to distribute data. To do that, we will
stop the processor so that it can be started again
+ // (and produce more data) and we can empty the queue so that we
know how much data to expect.
+ getNifiClient().getProcessorClient().stopProcessor(generate);
+ getClientUtil().emptyQueue(connection.getId());
+ }
assertEquals(20, getQueueSize(connection.getId()));
assertEquals(20 * 1024 * 1024, getQueueBytes(connection.getId()));
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/nifi.properties
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/nifi.properties
index 4544f2fc3b..049ac178ef 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/nifi.properties
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/nifi.properties
@@ -77,7 +77,7 @@
nifi.content.repository.implementation=org.apache.nifi.controller.repository.Fil
nifi.content.claim.max.appendable.size=50 KB
nifi.content.repository.directory.default=./content_repository
nifi.content.repository.archive.max.retention.period=12 hours
-nifi.content.repository.archive.max.usage.percentage=50%
+nifi.content.repository.archive.max.usage.percentage=90%
nifi.content.repository.archive.enabled=true
nifi.content.repository.always.sync=false
nifi.content.viewer.url=../nifi-content-viewer/
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/bootstrap.conf
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/bootstrap.conf
index 930e9449db..80bd3ed93d 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/bootstrap.conf
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/bootstrap.conf
@@ -27,7 +27,7 @@ java.arg.3=-Xmx512m
java.arg.14=-Djava.awt.headless=true
-java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8003
+#java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8003
java.arg.nodeNum=-DnodeNumber=2
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/nifi.properties
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/nifi.properties
index acd5c6707c..4b7f644058 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/nifi.properties
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/nifi.properties
@@ -77,7 +77,7 @@
nifi.content.repository.implementation=org.apache.nifi.controller.repository.Fil
nifi.content.claim.max.appendable.size=50 KB
nifi.content.repository.directory.default=./content_repository
nifi.content.repository.archive.max.retention.period=12 hours
-nifi.content.repository.archive.max.usage.percentage=50%
+nifi.content.repository.archive.max.usage.percentage=90%
nifi.content.repository.archive.enabled=true
nifi.content.repository.always.sync=false
nifi.content.viewer.url=../nifi-content-viewer/
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/nifi.properties
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/nifi.properties
index 7b3de1452f..7c6426c67b 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/nifi.properties
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/nifi.properties
@@ -77,7 +77,7 @@
nifi.content.repository.implementation=org.apache.nifi.controller.repository.Fil
nifi.content.claim.max.appendable.size=50 KB
nifi.content.repository.directory.default=./content_repository
nifi.content.repository.archive.max.retention.period=12 hours
-nifi.content.repository.archive.max.usage.percentage=50%
+nifi.content.repository.archive.max.usage.percentage=90%
nifi.content.repository.archive.enabled=true
nifi.content.repository.always.sync=false
nifi.content.viewer.url=../nifi-content-viewer/